博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java实现spark streaming与kafka集成进行流式计算
阅读量:6721 次
发布时间:2019-06-25

本文共 8518 字,大约阅读时间需要 28 分钟。

  hot3.png

java实现spark streaming与kafka集成进行流式计算

  • 2017/6/26补充:接手了搜索系统,这半年有了很多新的心得,懒改这篇粗鄙之文,大家看综合看这篇新博文来理解下面的粗鄙代码吧,。
  • 背景:网上关于spark streaming的文章还是比较多的,可是大多数用scala实现,因我们的电商实时推荐项目以java为主,就踩了些坑,写了java版的实现,代码比较意识流,轻喷,欢迎讨论。
  • 流程:spark streaming从kafka读用户实时点击数据,过滤数据后从redis读商品相似度矩阵,从db读user历史行为,实时计算兴趣度,并将结果写入redis一份,供api层读取展示,写入hdfs一份供离线计算准确率召回率。
  • 补充:据了解,大型实时推荐系统里面,协同过滤一般用作生成候选集,计算兴趣读会被ctr等策略的 rerank代替,在calculateinterest中调用在线rerank服务排序。
  • 12/13补充:召回不变,目前采用ctr预估加上规则排序,后续上ltr。

  • 废话少说,上代码:

public class Main {    static final String ZK_QUORUM = "*.*.*.*:2181,*.*.*.*:2181,*.*.*.*:2181/kafka";    static final String GROUP = "test-consumer-group";    static final String TOPICSS = "user_trace";    static final String NUM_THREAD = "64";    public static void main(String[] args) {        SparkConf sparkConf = new SparkConf().setAppName("main.java.computingCenter");        // Create the context with 2 seconds batch size        //每两秒读取一次kafka        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));        int numThreads = Integer.parseInt(NUM_THREAD);        Map
topicMap = new HashMap
(); String[] topics = TOPICSS.split(","); for (String topic: topics) { topicMap.put(topic, numThreads); } JavaPairReceiverInputDStream
messages = KafkaUtils.createStream(jssc, ZK_QUORUM, GROUP, topicMap); JavaDStream
lines = messages.map(new Function
, String>() { public String call(Tuple2
tuple2) { return tuple2._2(); } }); JavaDStream
words = lines.flatMap(new FlatMapFunction
() { public Iterable
call(String lines) { //kafka数据格式:"{\"Topic\":\"user_trace\",\"PartitionKey\":\"0\",\"TimeStamp\":1471524044018,\"Data\":\"0=163670589171371918%3A196846178238302087\",\"LogId\":\"0\",\"ContentType\":\"application/x-www-form-urlencoded\"}"; List
arr = new ArrayList
(); for (String s : lines.split(" ")) { Map j = JSON.parseObject(s); String s1 = ""; String s2 = ""; try { s1 = URLDecoder.decode(j.get("Data").toString(), "UTF-8"); s2 = s1.split("=")[1]; } catch (UnsupportedEncodingException e) { e.printStackTrace(); } arr.add(s2); } return arr; } }); JavaPairDStream
goodsSimilarityLists = words.filter(new Function
() { @Override public Boolean call(String s) throws Exception { //过滤非法的数据 if (s.split(":").length == 2) { return true; } return false; } }).mapPartitionsToPair(new PairFlatMapFunction
, String, String>() { //此处分partition对每个pair进行处理 @Override public Iterable
> call(Iterator
s) throws Exception { ArrayList
> result = new ArrayList
>(); while (s.hasNext()) { String x = s.next(); String userId = x.split(":")[0]; String goodsId = x.split(":")[1]; System.out.println(x); LinkedHashMap
recommendMap = null; try { //此service从redis读数据,进行实时兴趣度计算,推荐结果写入redis,供api层使用 CalculateInterestService calculateInterestService = new CalculateInterestService(); try { recommendMap = calculateInterestService.calculateInterest(userId, goodsId); } catch (Exception e) { e.printStackTrace(); } String text = ""; int count = 0; for (Map.Entry
entry : recommendMap.entrySet()) { text = text + entry.getKey(); if (count == recommendMap.size() - 1) { break; } count = count + 1; text = text + "{/c}"; } text = System.currentTimeMillis() + ":" + text; result.add(new Tuple2
(userId, text)); } catch (Exception e) { e.printStackTrace(); } } return result; } }); goodsSimilarityLists.foreachRDD(new Function
, Void>() { @Override public Void call(JavaPairRDD
rdd) throws Exception { //打印rdd,调试方便 System.out.println(rdd.collect()); return null; } }); JavaPairDStream
goodsSimilarityListsText = goodsSimilarityLists.mapToPair(new PairFunction
, Text, Text>(){ @Override public Tuple2
call(Tuple2
ori) throws Exception { //此处要将tuple2转化为org.apache.hadoop.io.Text格式,使用saveAsHadoopFiles方法写入hdfs return new Tuple2(new Text(ori._1), new Text(ori._2)); } }); //写入hdfs goodsSimilarityListsText.saveAsHadoopFiles("/user/hadoop/recommend_list/rl", "123", Text.class, Text.class, SequenceFileOutputFormat.class); jssc.start(); jssc.awaitTermination(); }}
public class CalculateInterestService {    private String dictKey = "greate_item_sim_2.0";    private String recommendTable = "great_recommend_table_2.0";    static final String HIGO_BASE_URL = "jdbc:mysql://*.*.*.*:3212/*";    static final String HIGO_BASE_USER = "*";    static final String HIGO_BASE_PASS = "*";    public LinkedHashMap
calculateInterest(String userId, String traceGoodsId) { LinkedHashMap
sortedMap = new LinkedHashMap
(); String[] simGoods = RedisHelper.getInstance().hget(dictKey, traceGoodsId).split(","); //用户的历史记录,应该存action:goodsId:timestamp格式,要重构,bi写入单独的数据表中 HashMap
userTrace = null; try { userTrace = getUserTrace(userId); } catch (ClassNotFoundException e) { e.printStackTrace(); return sortedMap; } HashMap
recommendMap = new HashMap
(); String[] simGoodsIds = new String[simGoods.length]; for (int i = 0; i < simGoods.length; i++) { simGoodsIds[i] = simGoods[i].split(":")[0]; } List
pSimGoodsIds = RedisHelper.getInstance().hmget(dictKey, simGoodsIds); HashMap
predictSimGoodsIds = new HashMap
(); for (int i = 0; i < simGoodsIds.length; i++) { predictSimGoodsIds.put(Long.parseLong(simGoodsIds[i]), pSimGoodsIds.get(i)); } for (String item : simGoods) { //need optimised Double totalSum = 0.0; Double sum = 0.0; Long originGoodsId = Long.parseLong(item.split(":")[0]); for (String predictGoods : predictSimGoodsIds.get(originGoodsId).split(",")) { Long goodsId = Long.parseLong(predictGoods.split(":")[0].toString()); Double sim = Double.valueOf(predictGoods.split(":")[1].toString()); totalSum = totalSum + sim; Double score = 0.0; if (!userTrace.containsKey(goodsId)) { //TODO 用户评分矩阵过于稀疏,需要svd补充评分,暂时无评分score为默认0.1 userTrace.put(goodsId, "default"); } String action = userTrace.get(goodsId); if (action.equals("click")) { score = 0.2; } else if (action.equals("favorate")) { } else if (action.equals("add_cart")) { score = 0.6; } else if (action.equals("order")) { score = 0.8; } else if (action.equals("default")) { score = 0.1; } //相似度词典应存 goodsid:sim格式,要重构 sum = sum + score * sim; } Double predictResult = sum / totalSum; recommendMap.put(originGoodsId, predictResult); } //sort recommend list List
> list = new ArrayList
>(recommendMap.entrySet()); Collections.sort(list, new Comparator
>() { @Override public int compare(Map.Entry
o1, Map.Entry
o2) { return o2.getValue().compareTo(o1.getValue()); } }); Map.Entry
tmpEntry = null; Iterator
> iter = list.iterator(); while (iter.hasNext()) { tmpEntry = iter.next(); sortedMap.put(tmpEntry.getKey(), tmpEntry.getValue()); } writeRecommendListToRedis(userId, sortedMap); return sortedMap; } private HashMap
getUserTrace(String userId) throws ClassNotFoundException { //SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); Class.forName("com.mysql.jdbc.Driver"); PreparedStatement stmt = null; Connection conn = null; UserTrace userTrace = new UserTrace(); try { conn = DriverManager.getConnection(HIGO_BASE_URL, HIGO_BASE_USER, HIGO_BASE_PASS); String sql = "select * from t_pandora_goods_record where account_id=" + userId; stmt = (PreparedStatement)conn.prepareStatement(sql); ResultSet rs = stmt.executeQuery(); while(rs.next()) { userTrace.setId(Long.parseLong(rs.getString(1))); userTrace.setAccountId(Long.parseLong(rs.getString(2))); userTrace.setGoodsIds(rs.getString(3)); userTrace.setMtime(rs.getString(4)); } stmt.close(); conn.close(); } catch (Exception e) { e.printStackTrace(); } String[] goodsActionTimestamp = userTrace.getGoodsIds().split(","); HashMap
hm = new HashMap
(); for (String ac : goodsActionTimestamp) { Long goodsId = Long.parseLong(ac.split(":")[0]); //String action = ac.split(":")[1]; //String timestamp = ac.split(":")[2]; //hack 下一步要bi把用户历史行为写入表中, action:goodsId:timestamp格式, timestamp后期将参与权重计算 String action = "click"; hm.put(goodsId, action); } return hm; } private void writeRecommendListToRedis(String userId, LinkedHashMap
sortedMap) { String recommendList = ""; int count = 0; for (Map.Entry
entry : sortedMap.entrySet()) { recommendList = recommendList + entry.getKey(); if (count == sortedMap.size() - 1) { break; } count = count + 1; recommendList = recommendList + ","; } RedisHelper.getInstance().hset(recommendTable, userId, recommendList); }}

转载于:https://my.oschina.net/woter/blog/1611987

你可能感兴趣的文章
Python并发编程之多进程(实战)
查看>>
[原创]关于easyui下datagrid表格控件分页控制(非url方式)
查看>>
数值优化 - 牛顿法
查看>>
python编码问题总结
查看>>
DateTimeUtil 工具类,android 和 java 通用
查看>>
Flipkart: 携手 Android Go 拥抱印度市场
查看>>
【模板】杜教筛(Sum)
查看>>
9月5 号作业管理信息系统
查看>>
a标签中有img有时候会把a撑过大
查看>>
数据库分库分表思路
查看>>
查看 FormData 中已存在的值
查看>>
shell浅谈之三for、while、until循环【转】
查看>>
嵌入式Linux下Camera编程--V4L2【转】
查看>>
算法(Algorithms)第4版 练习 1.3.25 1.3.24
查看>>
(三)Controller接口控制器详解(一)
查看>>
0328复利软件4.0测试
查看>>
git 回滚操作
查看>>
背包问题初探
查看>>
野生前端的数据结构基础练习(4)——字典
查看>>
***UML类图几种关系的总结
查看>>