Hazelcast 简明教程

Hazelcast - Map Reduce & Aggregations

MapReduce 是一种计算模型,在您有大量数据且需要多台机器(即分布式环境)计算数据时,该模型对数据处理非常有用。它涉及将数据映射为键值对,然后进行“缩减”,即对这些键进行分组并在值上执行操作。

鉴于 Hazelcast 是在考虑分布式环境的情况下设计的,因此它自然而然地实现了 Map-Reduce 框架。

让我们用一个例子看看该怎么做。

例如,让我们假设我们有关于汽车(品牌和车号)及其车主的数据。

Honda-9235, John
Hyundai-235, Alice
Honda-935, Bob
Mercedes-235, Janice
Honda-925, Catnis
Hyundai-1925, Jane

现在,我们必须找出每个品牌的汽车数量,即现代、本田等。

Example

让我们尝试使用 MapReduce 找出 −

package com.example.demo;

import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.IMap;
import com.hazelcast.mapreduce.Context;
import com.hazelcast.mapreduce.Job;
import com.hazelcast.mapreduce.JobTracker;
import com.hazelcast.mapreduce.KeyValueSource;
import com.hazelcast.mapreduce.Mapper;
import com.hazelcast.mapreduce.Reducer;
import com.hazelcast.mapreduce.ReducerFactory;

public class MapReduce {
   public static void main(String[] args) throws ExecutionException,
   InterruptedException {
      try {
         // create two Hazelcast instances
         HazelcastInstance hzMember = Hazelcast.newHazelcastInstance();
         Hazelcast.newHazelcastInstance();
         IMap<String, String> vehicleOwnerMap=hzMember.getMap("vehicleOwnerMap");
         vehicleOwnerMap.put("Honda-9235", "John");
         vehicleOwnerMap.putc"Hyundai-235", "Alice");
         vehicleOwnerMap.put("Honda-935", "Bob");
         vehicleOwnerMap.put("Mercedes-235", "Janice");
         vehicleOwnerMap.put("Honda-925", "Catnis");
         vehicleOwnerMap.put("Hyundai-1925", "Jane");
         KeyValueSource<String, String> kvs=KeyValueSource.fromMap(vehicleOwnerMap);
         JobTracker tracker = hzMember.getJobTracker("vehicleBrandJob");
         Job<String, String> job = tracker.newJob(kvs);
         ICompletableFuture<Map<String, Integer>> myMapReduceFuture =
            job.mapper(new BrandMapper())
            .reducer(new BrandReducerFactory()).submit();
         Map<String, Integer&g; result = myMapReduceFuture.get();
         System.out.println("Final output: " + result);
      } finally {
         Hazelcast.shutdownAll();
      }
   }
   private static class BrandMapper implements Mapper<String, String, String, Integer> {
      @Override
      public void map(String key, String value, Context<String, Integer>
      context) {
         context.emit(key.split("-", 0)[0], 1);
      }
   }
   private static class BrandReducerFactory implements ReducerFactory<String, Integer, Integer> {
      @Override
      public Reducer<Integer, Integer> newReducer(String key) {
         return new BrandReducer();
      }
   }
   private static class BrandReducer extends Reducer<Integer, Integer> {
      private AtomicInteger count = new AtomicInteger(0);
      @Override
      public void reduce(Integer value) {
         count.addAndGet(value);
      }
      @Override
      public Integer finalizeReduce() {
         return count.get();
      }
   }
}

让我们尝试理解此代码 −

  1. 我们创建 Hazelcast 成员。在该示例中,我们有一个成员,但可以有多个成员。

  2. 我们使用模拟数据创建地图,并由此创建键值存储。

  3. 我们创建 Map-Reduce 作业,并要求它将键值存储用作数据。

  4. 然后,我们将作业提交到集群并等待完成。

  5. 映射器创建一个键,即从原始键中提取品牌信息,将值设置为 1,然后以键值对 (K-V) 形式将该信息发送到归约器。

  6. 归约器简单地对值求和,按键(即品牌名称)对数据进行分组。

Output

代码的输出 −

Final output: {Mercedes=1, Hyundai=2, Honda=3}