本文介绍了如何创建使用几个客户端权重的 FL 算法?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

基于此我写一种新的FL算法方式.我训练所有客户端并将所有客户端的模型参数发送到服务器,服务器在聚合过程中只会对所有客户端的30%的模型参数进行加权平均.作为选择 30% 客户端模型参数的标准,我想使用 weights_delta 30% 客户端的loss_sum 客户端进行加权平均.>

下面的代码是针对这个链接.

@tf.functiondef client_update(model, dataset, server_message, client_optimizer):model_weights = model.weightsinitial_weights = server_message.model_weightstff.utils.assign(model_weights, initial_weights)num_examples = tf.constant(0, dtype=tf.int32)loss_sum = tf.constant(0, dtype=tf.float32)对于迭代器(数据集)中的批处理:使用 tf.GradientTape() 作为磁带:输出 = model.forward_pass(batch)grads = tape.gradient(outputs.loss,model_weights.trainable)grads_and_vars = zip(grads,model_weights.trainable)client_optimizer.apply_gradients(grads_and_vars)batch_size = tf.shape(batch['x'])[0]num_examples += batch_sizeloss_sum +=outputs.loss * tf.cast(batch_size, tf.float32)weights_delta = tf.nest.map_structure(lambda a, b: a - b,model_weights.trainable,initial_weights.trainable)client_weight = tf.cast(num_examples, tf.float32)client_loss = loss_sum #add返回 ClientOutput(weights_delta, client_weight, loss_sum/client_weight,client_loss)

client_output

中有如下属性

weights_delta = attr.ib()client_weight = attr.ib()模型输出 = attr.ib()client_loss = attr.ib()

之后,我通过序列的形式制作了client_outputcollected_output = tff.federated_collect(client_output)round_model_delta = tff.federated_map(selecting_fn,(collected_output,weight_denom))

 @tff.federated_computation(federated_server_state_type,federated_dataset_type)def run_one_round(server_state, federated_dataset):server_message = tff.federated_map(server_message_fn, server_state)server_message_at_client = tff.federated_broadcast(server_message)client_outputs = tff.federated_map(client_update_fn, (federated_dataset, server_message_at_client))weight_denom = client_outputs.client_weightcollect_output = tff.federated_collect(client_outputs) # 添加round_model_delta = tff.federated_map(selecting_fn,(collected_output,weight_denom)) #addserver_state = tff.federated_map(server_update_fn,(server_state, round_model_delta))round_loss_metric = tff.federated_mean(client_outputs.model_output, weight=weight_denom)返回 server_state, round_loss_metric

另外,添加如下代码此处 实现selecting_fn 功能.

@tff.tf_computation() # 追加def selection_fn(collected_output,weight_denom):#去做返回round_model_delta

我不确定按照上面的方式编写代码是否正确.我尝试了各种方法,但主要是 TypeError:要映射的值必须是 FederatedType 或隐式转换为 FederatedType(得到一个 <<model_weights=<trainable=<float32[5,5,1,32],float32[32],float32[5,5,32,64],float32[64],float32[3136,512],float32[512],float32[512,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,round_num=int32>@SERVER,{<float32[5,5,1,32],float32[32],float32[5,5,32,64],float32[64],float32[3136,512],float32[512],float32[512,10],float32[10]>}@CLIENTS>) 我收到这个错误.>

我想知道序列类型collected_output如何访问每个客户端的client_loss(= loss_sum)并对它们进行排序,也想知道用weight_denom 已应用.

解决方案

我看到的一个问题是在调用 tff.federated_map(selecting_fn,(collected_output,weight_denom) 时,collected_output 将被放置在 tff.SERVER 并且 weight_denom 将被放置在 tff.CLIENTS,所以这将不起作用.我认为您想先将所有内容都发送到 tff.SERVER.

我不确定您需要什么行为,但这里有一个示例代码,您可以从中开始和开发.它从客户端值(比如它的 ID)开始,采样一个随机值,将 (ID, value) 对收集到服务器,然后选择具有最大值的对 - 看起来与您描述的相似.

@tff.tf_computation()def client_sample_fn():返回 tf.random.uniform((1,))# tff.tf_computation 的类型注释可选.为了清楚起见,在此处添加.idx_sample_type = tff.to_type(((tf.int32, (1,)), (tf.float32, (1,))))@tff.tf_computation(tff.SequenceType(idx_sample_type))def select_fn(idx_sample_dataset): # 在里面,这是一个 tf.data.Dataset.# 连接数据集中的所有对.concat_fn = lambda a, b: tf.concat([a, b], axis=0)reduce_fn = lambda x, y: (concat_fn(x[0], y[0]), concat_fn(x[1], y[1]))reduce_zero = (tf.constant((), dtype=tf.int32, shape=(0,)),tf.constant((), dtype=tf.float32, shape=(0,)))idx_tensor,sample_tensor = idx_sample_dataset.reduce(reduce_zero,reduce_fn)# 找到 3 个最大的样本.top_3_val, top_3_idx = tf.math.top_k(sample_tensor, k=3)返回 tf.gather(idx_tensor, top_3_idx), [email protected]_computation(tff.type_at_clients((tf.int32, (1,))))def fed_fn(client_idx):client_sample = tff.federated_eval(client_sample_fn,tff.CLIENTS)# 第一个 zip,要有一个成对的数据集,而不是一对数据集.client_idx_sample_pair = tff.federated_zip((client_idx, client_sample))collect_idx_sample_pair = tff.federated_collect(client_idx_sample_pair)返回 tff.federated_map(select_fn,collected_idx_sample_pair)client_idx = [(10,), (11,), (12,), (13,), (14,)]fed_fn(client_idx)

使用示例输出:

(array([11, 10, 14], dtype=int32), array([0.8220736, 0.81413555, 0.6984291], dtype=float32))

Based on this link I am trying to write a new way of FL algorithm. I train all clients and send the model parameters of all clients to the server, and the server will weight average only the model parameters of 30% of all clients during the aggregation process. As a criterion for selecting model parameters of 30% of clients, I want to do a weighted average by using weights_delta of 30% of clients with less loss_sum of clients.

The code below is a modified code for this link.

@tf.function
def client_update(model, dataset, server_message, client_optimizer):

model_weights = model.weights
initial_weights = server_message.model_weights
tff.utils.assign(model_weights, initial_weights)

num_examples = tf.constant(0, dtype=tf.int32)
loss_sum = tf.constant(0, dtype=tf.float32)

for batch in iter(dataset):
    with tf.GradientTape() as tape:
        outputs = model.forward_pass(batch)
    grads = tape.gradient(outputs.loss, model_weights.trainable)
    grads_and_vars = zip(grads, model_weights.trainable)
    client_optimizer.apply_gradients(grads_and_vars)
    batch_size = tf.shape(batch['x'])[0]
    num_examples += batch_size
    loss_sum += outputs.loss * tf.cast(batch_size, tf.float32)        

weights_delta = tf.nest.map_structure(lambda a, b: a - b,
                                      model_weights.trainable,
                                      initial_weights.trainable)
client_weight = tf.cast(num_examples, tf.float32)

client_loss = loss_sum #add

return ClientOutput(weights_delta, client_weight, loss_sum / client_weight,client_loss) 

There are the following attributes in client_output

weights_delta = attr.ib()
client_weight = attr.ib()
model_output = attr.ib()
client_loss = attr.ib() 

After that, I made the client_output in the form of a sequence throughcollected_output = tff.federated_collect(client_output) and round_model_delta = tff.federated_map(selecting_fn,(collected_output,weight_denom))in here .

   @tff.federated_computation(federated_server_state_type,
                           federated_dataset_type)

    def run_one_round(server_state, federated_dataset):
    
    server_message = tff.federated_map(server_message_fn, server_state)
    server_message_at_client = tff.federated_broadcast(server_message)

    client_outputs = tff.federated_map(
        client_update_fn, (federated_dataset, server_message_at_client))

    weight_denom = client_outputs.client_weight

    collected_output = tff.federated_collect(client_outputs)  # add        
    
    round_model_delta = tff.federated_map(selecting_fn,(collected_output,weight_denom)) #add       

    server_state = tff.federated_map(server_update_fn,(server_state, round_model_delta))

    round_loss_metric = tff.federated_mean(client_outputs.model_output, weight=weight_denom)

    return server_state, round_loss_metric

Also, the following code is added here to implement the selecting_fn function.

@tff.tf_computation()  # append
def selecting_fn(collected_output,weight_denom):
    #TODO
    return round_model_delta

I am not sure if it is correct to write the code in the above way.I tried in various ways, but mainly TypeError: The value to be mapped must be a FederatedType or implicitly convertible to a FederatedType (got a <<model_weights=<trainable=<float32[5,5,1,32],float32[32] ,float32[5,5,32,64],float32[64],float32[3136,512],float32[512],float32[512,10],float32[10]>,non_trainable=<>>,optimizer_state= <int64>,round_num=int32>@SERVER,{<float32[5,5,1,32],float32[32],float32[5,5,32,64],float32[64],float32[3136,512 ],float32[512],float32[512,10],float32[10]>}@CLIENTS>) I get this error.

I wonder how the sequence type collected_output accesses each client's client_loss(= loss_sum) and sorts them, and also wonders what method to use when calculating the weighted average with weight_denom applied.

解决方案

One issue I see is that in the call tff.federated_map(selecting_fn,(collected_output,weight_denom), the collected_output will be placed at tff.SERVER and the weight_denom will be placed at tff.CLIENTS, so that will not work. I think you want to get everything to tff.SERVER first.

I am not sure what you behavior you need, but here is a sample code which you can start from and develop. It starts from a client value (say its ID), samples a random value, collects (ID, value) pairs to the server, and then selects the pairs with largest value -- seems similar to what you describe.

@tff.tf_computation()
def client_sample_fn():
  return tf.random.uniform((1,))

# Type annotation optional for tff.tf_computation. Added here for clarity.
idx_sample_type = tff.to_type(((tf.int32, (1,)), (tf.float32, (1,))))
@tff.tf_computation(tff.SequenceType(idx_sample_type))
def select_fn(idx_sample_dataset):  # Inside, this is a tf.data.Dataset.
  # Concatenate all pairs in the dataset.
  concat_fn = lambda a, b: tf.concat([a, b], axis=0)
  reduce_fn = lambda x, y: (concat_fn(x[0], y[0]), concat_fn(x[1], y[1]))
  reduce_zero = (tf.constant((), dtype=tf.int32, shape=(0,)),
                 tf.constant((), dtype=tf.float32, shape=(0,)))
  idx_tensor, sample_tensor = idx_sample_dataset.reduce(reduce_zero, reduce_fn)
  # Find 3 largest samples.
  top_3_val, top_3_idx = tf.math.top_k(sample_tensor, k=3)
  return tf.gather(idx_tensor, top_3_idx), top_3_val

@tff.federated_computation(tff.type_at_clients((tf.int32, (1,))))
def fed_fn(client_idx):
  client_sample = tff.federated_eval(client_sample_fn, tff.CLIENTS)
  # First zip, to have a dataset of pairs, rather than a pair of datasets.
  client_idx_sample_pair = tff.federated_zip((client_idx, client_sample))
  collected_idx_sample_pair = tff.federated_collect(client_idx_sample_pair)
  return tff.federated_map(select_fn, collected_idx_sample_pair)

client_idx = [(10,), (11,), (12,), (13,), (14,)]
fed_fn(client_idx)

With a sample output:

(array([11, 10, 14], dtype=int32), array([0.8220736 , 0.81413555, 0.6984291 ], dtype=float32))

这篇关于如何创建使用几个客户端权重的 FL 算法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-19 17:46