我正在尝试使用Hazelcast的map-reduce功能执行聚合操作,该操作需要访问位于同一位置的条目。使用data-affinity控制同一位置。
想象一下Hazelcast documentation on data-affinity中使用的经典客户/订单模型。在我的示例中,例如,给定此数据集,我想返回一个包含客户及其所有订单总和的客户摘要:
customer_id | name
------------------
1 | Dave
2 | Kate
order_id | customer_id | value
------------------------------
1 | 1 | 5
2 | 1 | 10
3 | 2 | 12
我想退货:
customer_id | name | value
--------------------------
1 | Dave | 15
2 | Kate | 12
这足够简单,但是使用数据亲和性的原因是能够通过简单地获取该分区内的所有顺序,从而在保存数据的各个分区内执行求和逻辑,从而避免任何跨JVM的通信。
因此,我的问题是,如何从Mapper或类似地址中获取位于同一缓存中的条目?
编辑:
在@noctarius的回答和评论之后,下面是一些代码(我已尝试使其尽可能简短),突出显示了我只希望从当前分区中获得订单的点。
订单键类如下所示:
public class OrderKey implements PartitionAware<CustomerIdentity>
{
...
@Override
public CustomerIdentity getPartitionKey()
{
return this.customerIdentity;
}
...
}
而
Mapper
像这样:public class OrderSumMapper implements Mapper<CustomerKey, Customer, CustomerKey, CustomerOrderTotal>, HazelcastInstanceAware
{
...
@Override
public void map(CustomerKey customerKey, Customer customer, Context<CustomerKey, CustomerOrderTotal> context)
{
Predicate ordersForCustomer = new OrdersForCustomerPredicate(customerKey);
int totalValue = 0;
//******************************************************************
//
// Given orders are co-located with the customer, how do you ensure
// this call to get the orders only runs in the current partition?
//
//******************************************************************
for (Order order : hazelcastInstance.getMap("orders").values(ordersForCustomer))
{
totalValue += order.getValue();
}
context.emit(customerKey, new CustomerOrderTotal(customer, total));
}
...
}
突出显示的呼叫
hazelcastInstance.getMap("orders").values(ordersForCustomer)
通常会命中群集中的所有节点,但是由于数据位于同一位置,因此这是不必要的开销。然后回到我原来的问题,如何获得订单,以便仅返回当前分区中的订单?
最佳答案
您只需将当前节点的HazelcastInstance注入到Mapper中,然后检索第二个数据结构以读取数据。
在这里查看基本示例:
https://github.com/noctarius/hazelcast-mapreduce-presentation/blob/master/src/main/java/com/hazelcast/examples/tutorials/impl/SalaryMapper.java