由于JavaReceiverInputDStream<String> lines = ssc.receiverStream(Receiver<T> receiver) 中 没有直接对接MetaQ的工具,当然可以实用使用spark streaming已经有的工具进行转接,这里不建议,所以可以继承Receiver类重写onStart()方法

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executor; import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import com.dinpay.bdp.rcp.domain.Order;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig; public abstract class MetaQReceiver<T> extends Receiver<T>{
private static final long serialVersionUID = -3240967436204273248L;
Logger logger=LoggerFactory.getLogger(MetaQReceiver.class);
private static final DateFormat df = new SimpleDateFormat("yyyyMMdd"); private String zkConnect;
private String zkRoot;
private String topic;
private String group; public MetaQReceiver(String zkConnect,String zkRoot,String topic,String group) {
super(StorageLevel.MEMORY_ONLY());
this.zkConnect=zkConnect;
this.zkRoot=zkRoot;
this.topic=topic;
this.group=group; } @Override
public void onStart() {
try{
final MetaClientConfig metaClientConfig = new MetaClientConfig();
final ZKConfig zkConfig = new ZKConfig();
zkConfig.zkConnect = this.zkConnect;// "127.0.0.1:2181";
zkConfig.zkRoot = this.zkRoot;// "/meta";
metaClientConfig.setZkConfig(zkConfig);
final MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(
metaClientConfig);
ConsumerConfig consumerConfig = new ConsumerConfig(group);
// 默认最大获取延迟为5秒,这里设置成100毫秒,请根据实际应用要求做设置。
consumerConfig.setMaxDelayFetchTimeInMills(100);
final MessageConsumer consumer = sessionFactory
.createConsumer(consumerConfig);
// subscribe topic
consumer.subscribe(topic, 1024 * 1024, new MessageListener() {
@Override
public void recieveMessages(final Message message) {
try{
//T t=message2Object(new String(message.getData(),"utf-8"));
logger.info("Receive message " + new String(message.getData()));
String orderJson = new String(message.getData());
Order order = ParameterDataUtil.getObject(orderJson, Order.class);
String cardNo = order.getCard_no();
String yyyyMMdd = df.format(new Date());
String payclassId = order.getPayclass_id();
String cntKey = "DK_CNT_" + cardNo + "_" + payclassId + "_" + yyyyMMdd;
logger.info(cntKey);
System.out.println(cntKey);
T result = (T) cntKey;
if(result!=null){
store(result);
}
}catch(Exception e){
logger.error("message2Object error",e);
}
}
@Override
public Executor getExecutor() {
return null;
}
});
consumer.completeSubscribe();
}catch(Exception e){
throw new RuntimeException("metaq error",e);
}
} @Override
public void onStop() {
} //public abstract T message2Object(String message) throws Exception;
}

下面该段代码可以减掉,若有需要转Object可以在此进行处理

public class MetaQReceiverStreaming extends MetaQReceiver<String>{

    private static final long serialVersionUID = -2290689243756756929L;

    public MetaQReceiverStreaming(String zkConnect, String zkRoot, String topic, String group) {
super(zkConnect, zkRoot, topic, group);
} /*@Override
public String message2Object(String message) throws Exception {
return message;
}*/ }

接下来通过spark streaming进行metaq的消息处理

ort java.util.Arrays;
import java.util.List; import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext; import com.dinpay.bdp.rcp.util.Constant;
import com.dinpay.bdp.rcp.util.MetaQReceiverStreaming;
import com.google.common.base.Optional;
import com.sun.xml.bind.v2.runtime.reflect.opt.Const; import scala.Tuple2;
/**
* @author ll
*/
public class MetaqStreamingCount { public static void main(String[] args) {
String zkConnect=Constant.METAZK;
String zkRoot="/meta";
String topic=Constant.METATOPIC;
String group=Constant.METAGROUP; //屏蔽日志
Logger.getLogger("org.apache.spark").setLevel(Level.OFF);
SparkConf sparkConf = new SparkConf().setAppName("MetaqStreamingCount").setMaster("local[2]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); JavaReceiverInputDStream<String> lines = ssc.receiverStream(new MetaQReceiverStreaming(zkConnect,zkRoot,topic,group)); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
}); JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) {
return new Tuple2<>(word, 1);
}
}); JavaPairDStream<String, Integer> wordsCount = pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>,
Optional<Integer>>() {//对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce) @Override
public Optional<Integer> call(List<Integer> values, Optional<Integer> state){
//第一个参数就是key传进来的数据,第二个参数是已经有的数据
Integer updateValue = 0;//如果第一次,state没有,updateValue为0,如果有就获取
if(state.isPresent()){
updateValue = state.get();
}
//遍历batch传进来的数据可以一直加,随着时间的流式会不断的累加相同key的value结果
for (Integer value : values) {
updateValue += value;
}
return Optional.of(updateValue);//返回更新的值
}
}); wordsCount.print();
//需要将结果保存到Codis中
ssc.checkpoint("checkpoint");
ssc.start();
ssc.awaitTermination();
ssc.close(); }
}
05-11 20:51