From 801fb682049255f0ba81df38d10ebc52dbfb017e Mon Sep 17 00:00:00 2001 From: tangwang Date: Fri, 17 Oct 2025 21:58:48 +0800 Subject: [PATCH] add cpp swing for mem optimize --- collaboration/.gitignore | 32 -------------------------------- collaboration/Makefile | 44 -------------------------------------------- collaboration/README.md | 17 ----------------- collaboration/Swing快速开始.md | 70 ---------------------------------------------------------------------- collaboration/bin/icf_simple | Bin 45912 -> 0 bytes collaboration/bin/swing | Bin 72744 -> 0 bytes collaboration/bin/swing_symmetric | Bin 59696 -> 0 bytes collaboration/eval.py | 105 --------------------------------------------------------------------------------------------------------- collaboration/include/BitMap.h | 45 --------------------------------------------- collaboration/include/utils.h | 42 ------------------------------------------ collaboration/run.sh | 134 -------------------------------------------------------------------------------------------------------------------------------------- collaboration/src/icf_simple.cc | 170 -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- collaboration/src/swing.cc | 409 ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- collaboration/src/swing_symmetric.cc | 234 ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ collaboration/src/ucf.py | 145 ------------------------------------------------------------------------------------------------------------------------------------------------- collaboration/utils/utils.cc | 55 ------------------------------------------------------- offline_tasks/collaboration/.gitignore | 32 ++++++++++++++++++++++++++++++++ offline_tasks/collaboration/Makefile | 44 ++++++++++++++++++++++++++++++++++++++++++++ offline_tasks/collaboration/README.md | 17 +++++++++++++++++ offline_tasks/collaboration/Swing快速开始.md | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ offline_tasks/collaboration/bin/icf_simple | Bin 0 -> 45912 bytes offline_tasks/collaboration/bin/swing | Bin 0 -> 72744 bytes offline_tasks/collaboration/bin/swing_symmetric | Bin 0 -> 59696 bytes offline_tasks/collaboration/eval.py | 105 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ offline_tasks/collaboration/include/BitMap.h | 45 +++++++++++++++++++++++++++++++++++++++++++++ offline_tasks/collaboration/include/utils.h | 42 ++++++++++++++++++++++++++++++++++++++++++ offline_tasks/collaboration/run.sh | 134 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ offline_tasks/collaboration/src/icf_simple.cc | 170 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ offline_tasks/collaboration/src/swing.cc | 409 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ offline_tasks/collaboration/src/swing_symmetric.cc | 234 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ offline_tasks/collaboration/src/ucf.py | 145 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ offline_tasks/collaboration/utils/utils.cc | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ offline_tasks/doc/Redis数据规范.md | 42 ++++++++++++++++++++++++++++++++++++------ 33 files changed, 1538 insertions(+), 1508 deletions(-) delete mode 100644 collaboration/.gitignore delete mode 100644 collaboration/Makefile delete mode 100644 collaboration/README.md delete mode 100644 collaboration/Swing快速开始.md delete mode 100755 collaboration/bin/icf_simple delete mode 100755 collaboration/bin/swing delete mode 100755 collaboration/bin/swing_symmetric delete mode 100644 collaboration/eval.py delete mode 100644 collaboration/include/BitMap.h delete mode 100644 collaboration/include/utils.h delete mode 100644 collaboration/run.sh delete mode 100644 collaboration/src/icf_simple.cc delete mode 100644 collaboration/src/swing.cc delete mode 100644 collaboration/src/swing_symmetric.cc delete mode 100644 collaboration/src/ucf.py delete mode 100644 collaboration/utils/utils.cc create mode 100644 offline_tasks/collaboration/.gitignore create mode 100644 offline_tasks/collaboration/Makefile create mode 100644 offline_tasks/collaboration/README.md create mode 100644 offline_tasks/collaboration/Swing快速开始.md create mode 100755 offline_tasks/collaboration/bin/icf_simple create mode 100755 offline_tasks/collaboration/bin/swing create mode 100755 offline_tasks/collaboration/bin/swing_symmetric create mode 100644 offline_tasks/collaboration/eval.py create mode 100644 offline_tasks/collaboration/include/BitMap.h create mode 100644 offline_tasks/collaboration/include/utils.h create mode 100644 offline_tasks/collaboration/run.sh create mode 100644 offline_tasks/collaboration/src/icf_simple.cc create mode 100644 offline_tasks/collaboration/src/swing.cc create mode 100644 offline_tasks/collaboration/src/swing_symmetric.cc create mode 100644 offline_tasks/collaboration/src/ucf.py create mode 100644 offline_tasks/collaboration/utils/utils.cc diff --git a/collaboration/.gitignore b/collaboration/.gitignore deleted file mode 100644 index 259148f..0000000 --- a/collaboration/.gitignore +++ /dev/null @@ -1,32 +0,0 @@ -# Prerequisites -*.d - -# Compiled Object files -*.slo -*.lo -*.o -*.obj - -# Precompiled Headers -*.gch -*.pch - -# Compiled Dynamic libraries -*.so -*.dylib -*.dll - -# Fortran module files -*.mod -*.smod - -# Compiled Static libraries -*.lai -*.la -*.a -*.lib - -# Executables -*.exe -*.out -*.app diff --git a/collaboration/Makefile b/collaboration/Makefile deleted file mode 100644 index d6122d1..0000000 --- a/collaboration/Makefile +++ /dev/null @@ -1,44 +0,0 @@ -# Build targets - -USER_FLAGS = -Wno-unused-result -Wno-unused-but-set-variable -Wno-sign-compare -Wall -USER_LIBS = - -# Compiler flags -CXX = g++ -std=c++11 -CXXFLAGS = $(USER_FLAGS) -O3 -I ./include -LDFLAGS = -lpthread - -# The names of the executables that will be built -target_swing = bin/swing -target_icf_simple = bin/icf_simple -target_swing_symmetric = bin/swing_symmetric - -# Ensure the bin directory exists -BIN_DIR = bin - -# Declare phony targets -.PHONY: all clean - -# Build all targets -all: $(BIN_DIR) $(target_swing) $(target_icf_simple) $(target_swing_symmetric) - -# Create bin directory if it doesn't exist -$(BIN_DIR): - mkdir -p $(BIN_DIR) - -# Build target swing -$(target_swing): src/swing.cc utils/utils.cc include/* - $(CXX) $(LDFLAGS) -o $(target_swing) src/swing.cc utils/utils.cc $(CXXFLAGS) - -# Build target swing_1st_order -$(target_icf_simple): src/icf_simple.cc utils/utils.cc include/* - $(CXX) $(LDFLAGS) -o $(target_icf_simple) src/icf_simple.cc utils/utils.cc $(CXXFLAGS) - -# Build target swing_symmetric -$(target_swing_symmetric): src/swing_symmetric.cc utils/utils.cc include/* - $(CXX) $(LDFLAGS) -o $(target_swing_symmetric) src/swing_symmetric.cc utils/utils.cc $(CXXFLAGS) - -# Clean build files -clean: - rm -f $(target_swing) $(target_icf_simple) $(target_swing_symmetric) - find . -name '*.o' -delete diff --git a/collaboration/README.md b/collaboration/README.md deleted file mode 100644 index f999279..0000000 --- a/collaboration/README.md +++ /dev/null @@ -1,17 +0,0 @@ - -协同算法 - -item协同: -标准swing算法 -swing_symmetric.cc - -改进的(非对称)swing算法 -swing.cc - -简单的item协同: -icf_simple.cc - - -用户协同: -ucf.py - diff --git a/collaboration/Swing快速开始.md b/collaboration/Swing快速开始.md deleted file mode 100644 index 8efd347..0000000 --- a/collaboration/Swing快速开始.md +++ /dev/null @@ -1,70 +0,0 @@ -# Swing算法快速开始 - -## 快速运行(3步) - -### 1. 生成Session文件 - -```bash -cd /home/tw/recommendation/offline_tasks -python3 scripts/generate_session.py --lookback_days 730 -``` - -这会生成: -- `output/session.txt.YYYYMMDD` - 标准格式(uid \t json) -- `output/session.txt.YYYYMMDD.cpp` - C++格式(纯json) - -### 2. 运行Swing算法 - -```bash -cd /home/tw/recommendation/collaboration -bash run.sh -``` - -### 3. 查看结果 - -```bash -# 查看可读版本(带商品名称) -cat output/swing_similar_readable.txt | head -20 - -# 或查看原始版本(仅ID) -cat output/swing_similar.txt | head -20 -``` - -## 输出文件 - -- `output_YYYYMMDD/swing_similar.txt` - Swing相似度结果(ID格式) -- `output_YYYYMMDD/swing_similar_readable.txt` - 可读版本(ID:名称格式) - -## 配置修改 - -如需调整参数,编辑 `run.sh`: - -```bash -# 数据路径 -SESSION_DATA_DIR="../offline_tasks/output" - -# Swing参数 -ALPHA=0.7 # 0.5-1.0,越小越关注用户共同行为 -THRESHOLD1=1 # 1-5,交互强度阈值 -THRESHOLD2=3 # 1-10,相似度计算阈值 -THREAD_NUM=4 # 线程数 -``` - -## 详细文档 - -查看完整文档:`../offline_tasks/SWING_USAGE.md` - -## 常见问题 - -**Q: 如何查看商品名称?** -A: 结果文件 `swing_similar_readable.txt` 已自动添加商品名称 - -**Q: 如何调整相似商品数量?** -A: 修改 `src/swing.cc` 中的 `max_sim_list_len`(默认300) - -**Q: Session文件找不到?** -A: 先运行步骤1生成session文件 - -**Q: 运行时间?** -A: 1万商品约1-5分钟,10万商品约10-30分钟 - diff --git a/collaboration/bin/icf_simple b/collaboration/bin/icf_simple deleted file mode 100755 index 3e2f6ca..0000000 Binary files a/collaboration/bin/icf_simple and /dev/null differ diff --git a/collaboration/bin/swing b/collaboration/bin/swing deleted file mode 100755 index 7e901cf..0000000 Binary files a/collaboration/bin/swing and /dev/null differ diff --git a/collaboration/bin/swing_symmetric b/collaboration/bin/swing_symmetric deleted file mode 100755 index 0ce2135..0000000 Binary files a/collaboration/bin/swing_symmetric and /dev/null differ diff --git a/collaboration/eval.py b/collaboration/eval.py deleted file mode 100644 index 7cdcef1..0000000 --- a/collaboration/eval.py +++ /dev/null @@ -1,105 +0,0 @@ -#!/home/SanJunipero/anaconda3/bin/python -# -*- coding:UTF-8 -*- -import os,sys,json,re,time -import numpy as np -import pandas as pd -from itertools import combinations -import logging -import traceback -import cgitb -from argparse import ArgumentParser - -sim_index = {} - -max_fea = 20 #最多用x个历史交互id去召回 -max_recall_len = 1200 - -def para_define(parser): - parser.add_argument('-s', '--sim_index', type=str, default='') - -def parse_sim_item_pair(x): - x = x.split(':') - return (int(x[0]), float(x[1])) - -def parse_session_item_pair(x): - x = x.split(':') - return (int(x[0][1:-1]), float(x[1])) - -def run_eval(FLAGS): - with open(FLAGS.sim_index) as f: - for line in f: - segs = line.rstrip().split('\t') - if len(segs) != 2: - continue - k, vlist = segs - sim_index[int(k)] = [parse_sim_item_pair(x) for x in vlist.split(',')] - - statis = [] - for line in sys.stdin: - line = line.strip() - segs = line.split('\t') - uid = segs[0] - session = segs[1][1:-1] - if not session: - continue - session_list = [parse_session_item_pair(x) for x in session.split(',')] - - score_list = {} - for item_id, wei in session_list[1:1+max_fea]: - for sim_item_id, sim_value in sim_index.get(item_id, []): - score_list.setdefault(sim_item_id, 0.0) - score_list[sim_item_id] += wei*sim_value - score_list.items() - sorted_score_list = sorted(score_list.items(), key = lambda k:k[1], reverse=True)[:max_recall_len] - - target_item_id = session_list[0][0] - hit_pos = -1 - for idx, (k, v) in enumerate(sorted_score_list): - if target_item_id == k: - hit_pos = idx - break - - if hit_pos == -1 or hit_pos > max_recall_len: - hit_pos = max_recall_len - info = (1, hit_pos, len(sorted_score_list), - int(hit_pos < 25), - int(hit_pos < 50), - int(hit_pos < 100), - int(hit_pos < 200), - int(hit_pos < 400), - int(hit_pos < 800), - int(hit_pos < max_recall_len), - ) - statis.append(info) - statis = np.array(statis) - - desc = '''(1, hit_pos, len(sorted_score_list), - int(hit_pos != -1 and hit_pos < 25), - int(hit_pos != -1 and hit_pos < 50), - int(hit_pos != -1 and hit_pos < 100), - int(hit_pos != -1 and hit_pos < 200), - int(hit_pos != -1 and hit_pos < 400), - int(hit_pos != -1 and hit_pos < 800), - int(hit_pos != -1), - )''' - print(desc) - - np.set_printoptions(suppress=True) - print(FLAGS.sim_index, 'mean', '\t'.join([str(x) for x in statis.mean(axis=0)]), sep='\t') - print(FLAGS.sim_index, 'sum', '\t'.join([str(x) for x in statis.sum(axis=0)]), sep='\t') - - - -def main(): - cgitb.enable(format='text') - # op config - parser = ArgumentParser() - para_define(parser) - - FLAGS, unparsed = parser.parse_known_args() - print(FLAGS) - - run_eval(FLAGS) - -if __name__ == "__main__": - main() diff --git a/collaboration/include/BitMap.h b/collaboration/include/BitMap.h deleted file mode 100644 index 8ba251a..0000000 --- a/collaboration/include/BitMap.h +++ /dev/null @@ -1,45 +0,0 @@ -#include -#include - -using namespace std; - -class BitMap -{ -public: - BitMap(size_t num) - { - _v.resize((num >> 5) + 1); - } - - void Set(size_t num) //set 1 - { - size_t index = num >> 5; - size_t pos = num & 0x1F; - _v[index] |= (1 << pos); - } - - void Reset(size_t num) //set 0 - { - size_t index = num >> 5; - size_t pos = num & 0x1F; - _v[index] &= ~(1 << pos); - } - - // - void ResetRoughly(size_t num) //set 0 - { - size_t index = num >> 5; - _v[index] = 0; - } - - bool Existed(size_t num)//check whether it exists - { - size_t index = num >> 5; - size_t pos = num & 0x1F; - return (_v[index] & (1 << pos)); - } - -private: - vector _v; -}; - diff --git a/collaboration/include/utils.h b/collaboration/include/utils.h deleted file mode 100644 index b305172..0000000 --- a/collaboration/include/utils.h +++ /dev/null @@ -1,42 +0,0 @@ -#ifndef ___HEADER_SWING_UTILS___ -#define ___HEADER_SWING_UTILS___ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - - -template -std::ostream& operator<< (std::ostream& out, const std::vector& v) { - if (!v.empty()) { - out << '['; - std::copy(v.begin(), v.end(), std::ostream_iterator(out, ", ")); - out << "\b\b]"; - } - return out; -} - -std::string currentTimetoStr(void); - -void split(std::vector& tokens, const std::string& s, const std::string& delimiters = " "); - -bool compare_pairs(const std::pair & a, const std::pair & b); - - -bool compare_i2ulist_map_iters(const std::unordered_map >::const_iterator & a, const std::unordered_map >::const_iterator & b); - - -#endif diff --git a/collaboration/run.sh b/collaboration/run.sh deleted file mode 100644 index 201eff1..0000000 --- a/collaboration/run.sh +++ /dev/null @@ -1,134 +0,0 @@ -#!/bin/bash -source ~/.bash_profile - -# ============================================================================ -# 配置区域 - 可根据实际情况修改 -# ============================================================================ - -# 数据路径配置 -# 修改这个路径指向实际的session文件位置 -SESSION_DATA_DIR="../offline_tasks/output" - -# Swing算法参数 -ALPHA=0.7 # Swing算法的alpha参数 -THRESHOLD1=1 # 交互强度阈值1 -THRESHOLD2=3 # 交互强度阈值2 -THREAD_NUM=4 # 线程数 -SHOW_PROGRESS=1 # 是否显示进度 (0/1) - -# Python环境(如果需要特定的Python环境,在这里配置) -PYTHON_CMD="python3" - -# ============================================================================ -# 脚本执行区域 -# ============================================================================ - -# 编译C++程序 -echo "编译Swing程序..." -make -if [[ $? -ne 0 ]]; then - echo "编译失败,退出" - exit 1 -fi - -# 获取日期 -DAY=`date +"%Y%m%d"` -# 如果需要使用特定日期,取消下面的注释 -# DAY=20241017 - -echo "处理日期: ${DAY}" - -# 清理旧的输出目录(365天前)和日志(180天前) -find . -type d -name 'output_*' -ctime +365 -exec rm -rf {} \; 2>/dev/null -mkdir -p logs -find logs/ -type f -mtime +180 -exec rm -f {} \; 2>/dev/null - -# 创建输出目录 -output_dir=output_${DAY} -mkdir -p ${output_dir} - -# 确定session文件路径 -# 优先使用带日期的文件,如果不存在则使用.cpp格式的文件 -SESSION_FILE="${SESSION_DATA_DIR}/session.txt.${DAY}.cpp" -if [[ ! -f ${SESSION_FILE} ]]; then - SESSION_FILE="${SESSION_DATA_DIR}/session.txt.${DAY}" -fi - -if [[ ! -f ${SESSION_FILE} ]]; then - echo "错误: Session文件不存在: ${SESSION_FILE}" - echo "请先运行 generate_session.py 生成session文件" - exit 1 -fi - -echo "使用session文件: ${SESSION_FILE}" -echo "Swing参数: alpha=${ALPHA}, threshold1=${THRESHOLD1}, threshold2=${THRESHOLD2}, threads=${THREAD_NUM}" - -# 运行Swing算法 -# 如果session文件格式是 "uid \t json",需要用cut -f 2提取json部分 -# 如果session文件格式是纯json(每行一个),直接cat即可 -echo "开始运行Swing算法..." -if grep -q $'\t' ${SESSION_FILE}; then - # 包含tab,需要提取第二列 - echo "检测到session文件包含uid,提取json部分..." - cat ${SESSION_FILE} | cut -f 2 | bin/swing ${ALPHA} ${THRESHOLD1} ${THRESHOLD2} ${THREAD_NUM} ${output_dir} ${SHOW_PROGRESS} -else - # 纯json格式 - echo "检测到session文件为纯json格式..." - cat ${SESSION_FILE} | bin/swing ${ALPHA} ${THRESHOLD1} ${THRESHOLD2} ${THREAD_NUM} ${output_dir} ${SHOW_PROGRESS} -fi - -# 检查Swing算法是否成功执行 -if [[ $? -eq 0 ]]; then - echo "Swing算法执行成功" - - # 更新软链接指向最新输出 - if [[ -e output ]]; then - rm -rf output - fi - ln -s "${output_dir}" output - echo "软链接已更新为指向 ${output_dir}" - - # 合并结果文件 - echo "合并结果文件..." - cat output/sim_matrx.* > output/swing_similar.txt - echo "结果已合并到 output/swing_similar.txt" - - # 生成可读的debug文件(添加商品名称) - echo "生成可读的debug文件..." - DEBUG_SCRIPT="../offline_tasks/scripts/add_names_to_swing.py" - - if [[ -f ${DEBUG_SCRIPT} ]]; then - ${PYTHON_CMD} ${DEBUG_SCRIPT} output/swing_similar.txt output/swing_similar_readable.txt --debug - - if [[ $? -eq 0 ]]; then - echo "Debug文件已生成: output/swing_similar_readable.txt" - else - echo "警告: 生成debug文件失败,但Swing结果已保存" - fi - else - echo "警告: Debug脚本不存在: ${DEBUG_SCRIPT}" - echo "跳过生成可读文件" - fi - -else - echo "Swing算法执行失败,未更新软链接" - exit 1 -fi - -# ============================================================================ -# 用户协同过滤(UCF)- 可选 -# ============================================================================ - -# 如果需要运行UCF,取消下面的注释 -# echo "运行用户协同过滤..." -# # 仅使用最新的5万条数据 -# tail -n 50000 ${SESSION_FILE} > output/ucf.input -# python3 src/ucf.py output/ucf.input output/ucf.txt - -echo "全部完成!" -echo "结果文件:" -echo " - Swing相似度: ${output_dir}/swing_similar.txt" -echo " - Swing可读版: ${output_dir}/swing_similar_readable.txt" - - - diff --git a/collaboration/src/icf_simple.cc b/collaboration/src/icf_simple.cc deleted file mode 100644 index e389af2..0000000 --- a/collaboration/src/icf_simple.cc +++ /dev/null @@ -1,170 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -//#include -#include -#include -#include -#include -#include -#include -#include "utils.h" - -int max_sim_list_len = 300; - -using namespace std; - -// 定义 64 位无符号整型 -typedef uint64_t ItemID; - -int main(int argc,char *argv[]) { - - float threshold1 = 0.5; - float threshold2 = 0.5; - int show_progress = 0; - - if (argc < 5) { - cout << "usage " << argv[0] << " threshold1 threshold2 show_progress(0/1)" << endl; - return -1; - } - - threshold1 = atof(argv[1]); - threshold2 = atof(argv[2]); - show_progress = atoi(argv[3]); - - cerr << currentTimetoStr() << " start... " << endl; - cerr << " threshold1 " << threshold1 << endl; - cerr << " threshold2 " << threshold2 << endl; - - // 一阶关系(DB簇索引) - unordered_map > sim_by_1rs_relation_map(1000000); - //sim_by_1rs_relation_map.reserve(1000000); - - string line_buff; - const string delimiters(","); - - vector field_segs; - vector > item_list; - - while (getline(cin, line_buff)) { - // 格式是一个json,所以要把开头和结尾的括号去掉 - line_buff.erase(0, line_buff.find_first_not_of("{")); - line_buff.erase(line_buff.find_last_not_of("}") + 1); - field_segs.clear(); - split(field_segs, line_buff, delimiters); - - item_list.clear(); - for (size_t i = 0; i < field_segs.size(); i++) { - const char * seg_pos = strchr(field_segs[i].c_str(), ':'); - if (seg_pos == NULL || (seg_pos - field_segs[i].c_str() >= field_segs[i].length())) break; - - float value = atof(seg_pos + 1); - if (value > threshold1 || value > threshold2) { - // 开头有一个双引号 - ItemID item_id = static_cast(strtoull(field_segs[i].c_str() + 1, NULL, 10)); - item_list.push_back(make_pair(item_id, value)); - } - } - - if (item_list.size() < 2) continue; - - // append本次的itemlist - ItemID map_key = 0; - ItemID map_key_1 = 0; - ItemID map_key_2 = 0; - pair >::iterator, bool> ins_ret; - - for (vector >::const_iterator i = item_list.begin(); i != item_list.end(); ++i) { - map_key_1 = i->first; - for (vector >::const_iterator j = item_list.begin(); j != item_list.end(); ++j) { - map_key_2 = j->first; - - if (map_key_1 == map_key_2) continue; - - if (i->second > threshold1 && j->second > threshold2) { - map_key = (map_key_1 << 32) + map_key_2; - ins_ret = sim_by_1rs_relation_map.insert(make_pair(map_key, make_pair(1, j->second))); - if (!ins_ret.second) { - ins_ret.first->second.first += 1; - ins_ret.first->second.second += j->second; - } - } - if (j->second > threshold1 && i->second > threshold2) { - map_key = (map_key_2 << 32) + map_key_1; - ins_ret = sim_by_1rs_relation_map.insert(make_pair(map_key, make_pair(1, i->second))); - if (!ins_ret.second) { - ins_ret.first->second.first += 1; - ins_ret.first->second.second += i->second; - } - } - } - } - } - - unordered_map > > sim_matrix(200000); - // 计算item_i, item_j合并的打分,total_wei / num * math.log(1.5*num, 1.5). - pair > > pair_entry; - pair > >::iterator, bool> ins_ret; - - for (unordered_map >::iterator iter = sim_by_1rs_relation_map.begin(); iter != sim_by_1rs_relation_map.end(); ++iter) { - ItemID item1 = iter->first >> 32; - ItemID item2 = iter->first & 0xFFFFFFFF; - - int num = iter->second.first; - float total_wei = iter->second.second; - float merged_score = total_wei / num * log(1.5 * num); - - pair_entry.first = item1; - - ins_ret = sim_matrix.insert(pair_entry); - ins_ret.first->second.push_back(make_pair(item2, merged_score)); - } - - // staits info of sim matrix - vector sim_list_len_statis; - sim_list_len_statis.resize(max_sim_list_len + 1); - - // write sim matrix - for (unordered_map > >::iterator iter = sim_matrix.begin(); iter != sim_matrix.end(); ++iter) { - vector > & sim_list_buff = iter->second; - int sim_list_len = sim_list_buff.size(); - if (sim_list_len > 0) { - sort(sim_list_buff.begin(), sim_list_buff.end(), compare_pairs); - - cout << iter->first << "\t" << sim_list_buff[0].first << ":" << sim_list_buff[0].second; - - if (sim_list_len > max_sim_list_len) sim_list_len = max_sim_list_len; - - sim_list_len_statis[sim_list_len] += 1; - - for (int i = 1; i < sim_list_len; i++) { - cout << ',' << sim_list_buff[i].first << ':' << sim_list_buff[i].second; - } - cout << endl; - } - } - - // staits info of sim matrix - int sum_groups = accumulate(sim_list_len_statis.begin(), sim_list_len_statis.end(), (int)0); - cerr << currentTimetoStr() << " write sim matrix finished" << endl; - cerr << currentTimetoStr() << " print staits info of sim matrix... " << sim_list_len_statis.size() << endl; - cerr << currentTimetoStr() << " total keys: " << sum_groups << endl; - - int accumulate = 0; - for (int i = sim_list_len_statis.size() - 1; i > -1; i--) { - accumulate += sim_list_len_statis[i]; - fprintf(stderr, "simlist_len %4d, num %4d, accumulate %6d accumulated_rate %5.2f%\%\n", (int)i, sim_list_len_statis[i], accumulate, 100.0 * accumulate / sum_groups); - } - - return 0; -} diff --git a/collaboration/src/swing.cc b/collaboration/src/swing.cc deleted file mode 100644 index 6666c8c..0000000 --- a/collaboration/src/swing.cc +++ /dev/null @@ -1,409 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "utils.h" -#include "BitMap.h" - -using namespace std; - -// 使用 typedef 定义 itemID 类型 -typedef unsigned long long itemID; - -class Config { -public: - Config() { - - user_sessions_num = 2000000; - items_num = 160000; - - max_sim_list_len = 300; - max_session_list_len = 100; - - threshold1 = 0.5; - threshold2 = 0.5; - alpha = 0.5; - thread_num = 20; - show_progress = 0; - output_path = "result"; - } - - int load(int argc,char *argv[]) { - if (argc < 7) { - cout << "usage " << argv[0] << " alpha threshold1 threshold2 thread_num output_path show_progress(0/1) " << endl; - return -1; - } - - alpha = atof(argv[1]); - threshold1 = atof(argv[2]); - threshold2 = atof(argv[3]); - - thread_num = atoi(argv[4]); - output_path = argv[5]; - show_progress = atoi(argv[6]); - - cout << currentTimetoStr() << " start... " << endl; - cout << " threshold1 " << threshold1 << endl; - cout << " threshold2 " << threshold2 << endl; - cout << " alpha " << alpha << endl; - return 0; - } - -public: - int user_sessions_num; - int items_num; - - int max_sim_list_len; // 输出相似itemlist 最大长度 - int max_session_list_len; // 输入的 用户行为列表,截断长度(按权重排序阶段) - float threshold1; - float threshold2; - float alpha; - float thread_num; - int show_progress; - string output_path; -}; - -/** - * - * read data from stdin - * format: - * 输入的itemlist必须是按照权重排序的 - * - * {"111":3.9332,"222":0.0382,"333":0.0376} - * {"444":13.2136,"555":2.1438,"666":1.3443,"777":0.6775} - * {"888":22.0632,"999":0.0016} - * - * parm : - * config - * groups : index of user_id -> items - * i2u_map : index of item -> users - */ -int load_data(const Config & config, - vector< pair , vector > > & groups, - unordered_map, vector > > & i2u_map) { - - string line_buff; - - const string delimiters(","); - - vector field_segs; - // 每个元素是一个user的两个itemlist,first是交互强度大于threshold1的itemList,后者是强度大于threshold2的itemList - pair , vector > itemlist_pair; - - - pair , vector > > pair_entry; - pair , vector > >::iterator, bool> ins_i2u_ret; - - while (getline(cin, line_buff)) { - //格式是一个json,所以要把开头和结尾的括号去掉 - line_buff.erase(0,line_buff.find_first_not_of("{")); - line_buff.erase(line_buff.find_last_not_of("}") + 1); - //cout << line_buff << " !!!" << endl; - field_segs.clear(); - split(field_segs, line_buff, delimiters); - if (field_segs.size() < config.max_session_list_len) { - field_segs.resize(config.max_session_list_len); - } - - // field_segs是按权重有序的,进行截断 - - for (size_t i = 0; i < field_segs.size(); i++) { - const char * seg_pos = strchr(field_segs[i].c_str(), ':') ; - if (seg_pos == NULL || (seg_pos - field_segs[i].c_str() >= field_segs[i].length())) break; - - float value = atof(seg_pos + 1); - if (value < config.threshold1 && value < config.threshold2) break; - - // 开头有一个双引号 - itemID item_id = strtoull(field_segs[i].c_str() + 1, nullptr, 10); - if (value > config.threshold1) { - itemlist_pair.first.push_back(item_id); - } - if (value > config.threshold2) { - itemlist_pair.second.push_back(item_id); - } - } - - // 左侧必须有2个item,右侧必须有1个item,此时该用户才有可能给(item_i, item_j) 打分 - if (!(itemlist_pair.first.size() > 1 && itemlist_pair.second.size() > 0)) { - itemlist_pair.first.clear(); - itemlist_pair.second.clear(); - continue; - } - // 排序 - sort(itemlist_pair.first.begin(), itemlist_pair.first.end()); - sort(itemlist_pair.second.begin(), itemlist_pair.second.end()); - - // 合入i2u索引 - int idx = groups.size(); //待插入的index - for (auto item_id : itemlist_pair.first) { - pair_entry.first = item_id; - ins_i2u_ret = i2u_map.insert(pair_entry); - ins_i2u_ret.first->second.first.push_back(idx); - } - for (auto item_id : itemlist_pair.second) { - pair_entry.first = item_id; - ins_i2u_ret = i2u_map.insert(pair_entry); - ins_i2u_ret.first->second.second.push_back(idx); - } - - // 插入 u -> item_list索引 - groups.resize(groups.size()+1); - groups.back().first.swap(itemlist_pair.first); - groups.back().second.swap(itemlist_pair.second); - - } - - cout << currentTimetoStr() << " items num: " << i2u_map.size() << endl; - cout << currentTimetoStr() << " users num: " << groups.size() << endl; - cout << currentTimetoStr() << " sort.." << endl; - - for (auto iter : i2u_map) { - sort(iter.second.first.begin(), iter.second.first.end()); - sort(iter.second.second.begin(), iter.second.second.end()); - } - cout << currentTimetoStr() << " sort finished" << endl; - return 0; - -} - - -struct TaskOutput { - int id; - string output_path; - vector sim_list_len_statis; -}; - - -/* - * input parm: - * groups : u -> i index - * i2u_map : i -> u index - * output_path : path of write sim matrix - * - * output param: - * out - * - */ -int calc_sim_matrix(const Config & config, - const vector< pair , vector > > & groups, - const unordered_map, vector > > & i2u_map, - TaskOutput & out, - int task_id, int total_tasks -) { - - int users_num = groups.size(); - int items_num = i2u_map.size(); - if (items_num < 2) return -1; - - ofstream out_file(out.output_path); - if (out_file.fail()) { - cerr << currentTimetoStr() << " create out_file err: " << out.output_path << endl; - return -1; - } - - vector users_intersection_buffer; - vector items_intersection_buffer; - vector > sim_list_buff; - users_intersection_buffer.reserve(2048); - BitMap user_bm(users_num); - bool use_bitmap; - - out.sim_list_len_statis.resize(config.max_sim_list_len+1); - - int idx = 0; - for (auto & iter_i : i2u_map) { - // if ((idx++) % total_tasks != task_id) continue; - // 改进任务分配策略,避免不同线程计算相同的 itemID。上面是基于索引 idx 分配任务 - // 基于 itemID 的值进行分配,避免相同的 itemID 被多个线程处理。 - if (iter_i.first % total_tasks != task_id) continue; - - const vector & ulist_of_item_i = iter_i.second.first; - if (config.show_progress) { - fprintf(stdout, "\r%d of %d", idx++, items_num); - } - sim_list_buff.clear(); - - //use_bitmap = true; - use_bitmap = ulist_of_item_i.size() > 50; - /** - * 由全部使用有序数组求交,改为 长用bitmap,短的遍历,时长由 30 分钟 提升到 12分钟(users num 100w+) - * // bitmapsize长度(users num)100万+的情况下,这个阈值选取0(即全部使用bitmap),50和100,时长都差不多。但是还是保留这个逻辑,单user_list长度达到千万时,这里根据阈值做区分对待应该还是有必要 - */ - if (use_bitmap) { - for (auto user_id : ulist_of_item_i) { - user_bm.Set(user_id); - } - } - - for (auto & iter_j : i2u_map) { - if (iter_j.first == iter_i.first) continue; - - const vector & ulist_of_item_j = iter_j.second.second; - users_intersection_buffer.clear(); - // 交互过item_i, item_j的user_list - if (use_bitmap) { - for (auto user_id : ulist_of_item_j) { - if (user_bm.Existed(user_id)) { - users_intersection_buffer.push_back(user_id); - } - } - } else { - set_intersection(ulist_of_item_i.begin(), ulist_of_item_i.end(), ulist_of_item_j.begin(), ulist_of_item_j.end(), back_inserter(users_intersection_buffer)); - } - - if (users_intersection_buffer.size() < 2) continue; - // user_i, user_j - - float sim_of_item_i_j = 0.0; - // 遍历共同交互过(item_i, item_j)的user组合(user_i, user_j) - for (vector::const_iterator user_i = users_intersection_buffer.begin() + 1; - user_i != users_intersection_buffer.end(); - ++user_i) { - - const vector & item_list_of_user_i = groups[*user_i].first; // 使用first - for (vector::const_iterator user_j = users_intersection_buffer.begin(); - user_j != user_i; - ++user_j) { - - const vector & item_list_of_user_j = groups[*user_j].first; // 使用first - items_intersection_buffer.clear(); - - // 求交集 - set_intersection(item_list_of_user_i.begin(), item_list_of_user_i.end(), - item_list_of_user_j.begin(), item_list_of_user_j.end(), - back_inserter(items_intersection_buffer)); - - sim_of_item_i_j += 1.0 / (config.alpha + items_intersection_buffer.size()); - } - } - sim_list_buff.push_back(make_pair(iter_j.first, sim_of_item_i_j)); - } - - if (use_bitmap) { - for (auto user_id : ulist_of_item_i) { - user_bm.ResetRoughly(user_id); - } - } - - int sim_list_len = sim_list_buff.size(); - if (sim_list_len > 0) { - - sort(sim_list_buff.begin(), sim_list_buff.end(), compare_pairs); - - out_file << iter_i.first << "\t" << sim_list_buff[0].first << ":" << sim_list_buff[0].second; - - if (sim_list_len > config.max_sim_list_len) sim_list_len = config.max_sim_list_len; - - out.sim_list_len_statis[sim_list_len] += 1; - - for (int i = 1; i < sim_list_len; i++) { - out_file << ',' << sim_list_buff[i].first << ':' << sim_list_buff[i].second; - } - out_file << endl; - } - - } - - out_file.close(); - return 0; -} - -void printSimMatrixStatisInfo(string task_name, const vector & sim_list_len_statis) { - // staits info of sim matrix - int sum_groups = accumulate(sim_list_len_statis.begin(), sim_list_len_statis.end(), (int)0); - cout << currentTimetoStr() << " ========== TASK STATIS INFO [" << task_name << "]==========" << endl; - cout << currentTimetoStr() << " write sim matrix finished" << endl; - cout << currentTimetoStr() << " print staits info of sim matrix... " << sim_list_len_statis.size() << endl; - cout << currentTimetoStr() << " total keys: " << sum_groups << endl; - - int accumulate = 0; - for (int i = sim_list_len_statis.size() - 1; i >= 0; i--) { - accumulate += sim_list_len_statis[i]; - if (i % 20 == 0) { - // 注意 为防止输出太多,间隔20输出一行,所以num与上一行的累加不会等于accumulate - fprintf(stdout, "simlist_len %4d, num %4d, accumulate %6d accumulated_rate %5.2f%\%\n", - (int) i, sim_list_len_statis[i], accumulate, 100.0 * accumulate / sum_groups); - } - } -} - -int main(int argc,char *argv[]) { - - Config config; - int ret = config.load(argc, argv); - if (ret < 0) { - cerr << currentTimetoStr() << " load_config err: " << ret << endl; - return ret; - } - - cout << currentTimetoStr() << " start load raw user_session data ... " << endl; - - vector< pair , vector > > groups; - groups.reserve(config.user_sessions_num); - - unordered_map, vector > > i2u_map; - i2u_map.reserve(config.items_num); - - ret = load_data(config, groups, i2u_map); - if (ret < 0) { - cerr << currentTimetoStr() << " load_data err: " << ret << endl; - return ret; - } - cout << currentTimetoStr() << " load raw user_session data finished. " << endl; - - vector outs; - outs.resize(config.thread_num); - - vector threads; - char out_path[256]; - for (int task_id = 0; task_id < config.thread_num; task_id++) { - outs[task_id].id = task_id; - - snprintf(out_path, sizeof(out_path), "%s/sim_matrx.%0.1f_%0.3f_%0.3f.%d", config.output_path.c_str(), config.alpha, config.threshold1, config.threshold2, task_id); - outs[task_id].output_path = out_path; - threads.push_back(thread(calc_sim_matrix, std::cref(config), std::cref(groups), std::cref(i2u_map), std::ref(outs[task_id]), task_id, config.thread_num)); - } - - // wait all tasks - cout << endl; - cout << currentTimetoStr() << " wait sim_calc threads ... " << endl; - std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join)); - cout << currentTimetoStr() << " all sim_calc tasks finished" << endl; - - // merge outputs - TaskOutput merged_output; - vector & sim_list_len_statis = merged_output.sim_list_len_statis; - for (auto & out_task_i : outs) { - string task_name = std::to_string(out_task_i.id) + " " + out_task_i.output_path; - printSimMatrixStatisInfo(task_name, out_task_i.sim_list_len_statis); - - vector & list_i = out_task_i.sim_list_len_statis; - if (sim_list_len_statis.size() < list_i.size()) { - sim_list_len_statis.resize(list_i.size()); - } - for (size_t j = 0; j < list_i.size(); j++) { - sim_list_len_statis[j] += list_i[j]; - } - } - - printSimMatrixStatisInfo("Merged", sim_list_len_statis); - - return 0; -} diff --git a/collaboration/src/swing_symmetric.cc b/collaboration/src/swing_symmetric.cc deleted file mode 100644 index 721a031..0000000 --- a/collaboration/src/swing_symmetric.cc +++ /dev/null @@ -1,234 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "utils.h" -#include "BitMap.h" - -int max_sim_list_len = 300; - -using namespace std; - -typedef unsigned long long item_id_t; // 定义64位无符号整型作为item ID - -// 比较函数,用于排序时按item_id_t来比较 -bool compare_i2ulist_map_iters2(const unordered_map>::const_iterator &a, - const unordered_map>::const_iterator &b) { - return a->first < b->first; -} - -// 比较函数,用于sim_list排序 -bool compare_pairs2(const pair &a, const pair &b) { - return a.second > b.second; -} - -int main(int argc, char *argv[]) { - - float alpha = 0.5; - float threshold = 0.5; - int show_progress = 0; - - if (argc < 4) { - cerr << "usage " << argv[0] << " alpha threshold show_progress(0/1)" << endl; - return -1; - } - - alpha = atof(argv[1]); - threshold = atof(argv[2]); - show_progress = atoi(argv[3]); - - cerr << currentTimetoStr() << " start... " << endl; - cerr << " alpha " << alpha << endl; - cerr << " threshold " << threshold << endl; - - unordered_map> i2u_map; - i2u_map.reserve(160000); - - string line_buff; - const string delimiters(","); - - vector field_segs; - vector> groups; // Changed to store item_id_t - groups.reserve(2000000); - vector item_list; - - vector items_intersection_buffer; - vector users_intersection_buffer; - users_intersection_buffer.reserve(2000); - - pair> pair_entry; - pair>::iterator, bool> ins_i2u_ret; - - while (getline(cin, line_buff)) { - // 格式是一个json,所以要把开头和结尾的括号去掉 - line_buff.erase(0, line_buff.find_first_not_of("{")); - line_buff.erase(line_buff.find_last_not_of("}") + 1); - field_segs.clear(); - split(field_segs, line_buff, delimiters); - - item_list.clear(); - for (size_t i = 0; i < field_segs.size(); i++) { - const char *seg_pos = strchr(field_segs[i].c_str(), ':'); - if (seg_pos == NULL || (seg_pos - field_segs[i].c_str() >= field_segs[i].length())) break; - - float value = atof(seg_pos + 1); - if (value > threshold) { - // 开头有一个双引号 - item_id_t item_id = strtoull(field_segs[i].c_str() + 1, NULL, 10); - item_list.push_back(item_id); - } - } - - if (item_list.size() < 2) continue; - // 排序 - sort(item_list.begin(), item_list.end()); - - // append本次的itemlist - int idx = groups.size(); - groups.push_back(item_list); // item_list is now of type item_id_t - // 合入i2u索引 - for (vector::const_iterator iter = item_list.begin(); iter != item_list.end(); ++iter) { - pair_entry.first = *iter; - ins_i2u_ret = i2u_map.insert(pair_entry); - ins_i2u_ret.first->second.push_back(idx); - } - } - - int items_num = i2u_map.size(); - int users_num = groups.size(); - cerr << currentTimetoStr() << " items num: " << i2u_map.size() << endl; - cerr << currentTimetoStr() << " users num: " << groups.size() << endl; - cerr << currentTimetoStr() << " sort.." << endl; - - vector>::const_iterator> sorted_i_ulist_pairs; - - for (unordered_map>::iterator iter = i2u_map.begin(); iter != i2u_map.end(); ++iter) { - sorted_i_ulist_pairs.push_back(iter); - sort(iter->second.begin(), iter->second.end()); - } - cerr << currentTimetoStr() << " sort finished" << endl; - - sort(sorted_i_ulist_pairs.begin(), sorted_i_ulist_pairs.end(), compare_i2ulist_map_iters2); - - if (items_num < 2) return -1; - - vector> sim_list_buff; - unordered_map>> sim_matrix; - sim_matrix.reserve(items_num); - - int idx = 0; - - BitMap user_bm(users_num); - bool use_bitmap; - vector sim_list_len_statis; - sim_list_len_statis.resize(max_sim_list_len + 1); - - for (int i = 1; i < sorted_i_ulist_pairs.size(); ++i) { - unordered_map>::const_iterator pair_i = sorted_i_ulist_pairs[i]; - if (show_progress) { - fprintf(stderr, "\r%d of %d", idx++, items_num); - } - sim_list_buff.clear(); - - use_bitmap = pair_i->second.size() > 50; - - if (use_bitmap) { - for (vector::const_iterator iter_pair_i = pair_i->second.begin(); iter_pair_i != pair_i->second.end(); ++iter_pair_i) { - user_bm.Set(*iter_pair_i); - } - } - - for (int j = 0; j < i; ++j) { - unordered_map>::const_iterator pair_j = sorted_i_ulist_pairs[j]; - users_intersection_buffer.clear(); - - if (use_bitmap) { - for (vector::const_iterator iter_pair_j = pair_j->second.begin(); iter_pair_j != pair_j->second.end(); ++iter_pair_j) { - if (user_bm.Existed(*iter_pair_j)) { - users_intersection_buffer.push_back(*iter_pair_j); - } - } - } else { - set_intersection(pair_i->second.begin(), pair_i->second.end(), pair_j->second.begin(), pair_j->second.end(), back_inserter(users_intersection_buffer)); - } - - if (users_intersection_buffer.size() < 2) continue; - - float sim_of_item_i_j = 0.0; - for (vector::const_iterator user_i = users_intersection_buffer.begin() + 1; - user_i != users_intersection_buffer.end(); - ++user_i) { - - const vector &item_list_of_user_i = groups[*user_i]; - - for (vector::const_iterator user_j = users_intersection_buffer.begin(); - user_j != user_i; - ++user_j) { - - const vector &item_list_of_user_j = groups[*user_j]; - items_intersection_buffer.clear(); - set_intersection(item_list_of_user_i.begin(), item_list_of_user_i.end(), item_list_of_user_j.begin(), item_list_of_user_j.end(), back_inserter(items_intersection_buffer)); - - sim_of_item_i_j += 1.0 / (alpha + items_intersection_buffer.size()); - } - } - sim_list_buff.push_back(make_pair(pair_j->first, sim_of_item_i_j)); - } - - sim_matrix[pair_i->first] = sim_list_buff; - for (auto &p : sim_list_buff) { - sim_matrix[p.first].push_back(make_pair(pair_i->first, p.second)); - } - if (use_bitmap) { - for (vector::const_iterator iter_pair_i = pair_i->second.begin(); iter_pair_i != pair_i->second.end(); ++iter_pair_i) { - user_bm.ResetRoughly(*iter_pair_i); - } - } - } - - for (auto &p : sim_matrix) { - vector> &sim_list = p.second; - int sim_list_len = p.second.size(); - if (sim_list_len > 0) { - sort(sim_list.begin(), sim_list.end(), compare_pairs2); - - cout << p.first << "\t" << sim_list[0].first << ":" << sim_list[0].second; - - if (sim_list_len > max_sim_list_len) { - sim_list_len = max_sim_list_len; - } - - sim_list_len_statis[sim_list_len] += 1; - - for (int i = 1; i < sim_list_len; i++) { - cout << ',' << sim_list[i].first << ':' << sim_list[i].second; - } - cout << endl; - } - } - - int sum_groups = accumulate(sim_list_len_statis.begin(), sim_list_len_statis.end(), 0); - cerr << currentTimetoStr() << " write sim matrix finished" << endl; - cerr << currentTimetoStr() << " print stats info of sim matrix... " << sim_list_len_statis.size() << endl; - cerr << currentTimetoStr() << " total keys: " << sum_groups << endl; - - int accumulate = 0; - for (int i = sim_list_len_statis.size() - 1; i >= 0; i--) { - accumulate += sim_list_len_statis[i]; - fprintf(stderr, "simlist_len %4d, num %4d, accumulate %6d accumulated_rate %5.2f%%\n", - i, sim_list_len_statis[i], accumulate, 100.0 * accumulate / sum_groups); - } - - return 0; -} diff --git a/collaboration/src/ucf.py b/collaboration/src/ucf.py deleted file mode 100644 index db93dbf..0000000 --- a/collaboration/src/ucf.py +++ /dev/null @@ -1,145 +0,0 @@ -import sys -import json -import logging -from collections import defaultdict -from sklearn.metrics.pairwise import cosine_similarity -import numpy as np - -# 日志配置 -logging.basicConfig(filename='logs/ucf.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - -# 输入数据,用户对item的评分 -# 暂定为0.0,也就是所有的行为都会进行考虑。如果要过滤掉只有一次点击的,可以设定为1.1,1分是一次点击,有点击阅读页或者多次点击就会达到2分以上 -user_rating_threshold = 0.0 -# 当某个用于基于最近邻推荐输出的item list低于多少时不输出 -least_items_size_to_output = 5 -# 每个用户输出的top_k -top_k = 50 - -# 本模块的主要特点: -# 读取数据并合并同一个用户的多行记录:同一个用户可能会出现在多行中,对同一个用户的多行记录进行了合并。 -# 计算用户之间的相似性:用户协同过滤的关键是计算用户之间的相似度。为了加速计算,可以使用基于向量化的余弦相似度,而避免直接计算两两用户之间的相似度。 -# 为每个用户推荐物品:根据相似用户的评分,为每个用户推荐新的物品,并计算推荐得分。 - - -# 读取数据,并合并同一个用户的多行记录 -def read_input(input_file): - user_items = defaultdict(dict) - - with open(input_file, 'r') as f: - for line_num, line in enumerate(f, 1): - try: - uid, items_str = line.strip().split('\t') - items = json.loads(items_str) - for item_id, score in items.items(): - if score < user_rating_threshold: - continue - if item_id in user_items[uid]: - user_items[uid][item_id] += score # 合并相同用户的评分 - else: - user_items[uid][item_id] = score - except ValueError as ve: - logging.error(f"Data format error at line {line_num}: {line.strip()}. Error: {ve}") - except json.JSONDecodeError as je: - logging.error(f"JSON parse error at line {line_num}: {line.strip()}. Error: {je}") - - logging.info(f"Input data loaded from {input_file}. Total users: {len(user_items)}") - return user_items - -# 基于物品评分构建用户-物品矩阵 -def build_user_item_matrix(user_items): - all_items = set() - for items in user_items.values(): - all_items.update(items.keys()) - - item_list = list(all_items) - item_index = {item_id: idx for idx, item_id in enumerate(item_list)} - - user_list = list(user_items.keys()) - user_index = {uid: idx for idx, uid in enumerate(user_list)} - - user_item_matrix = np.zeros((len(user_list), len(item_list))) - - for uid, items in user_items.items(): - for item_id, score in items.items(): - user_item_matrix[user_index[uid]][item_index[item_id]] = score - - logging.info(f"User-item matrix built with shape: {user_item_matrix.shape}") - - return user_item_matrix, user_list, item_list, user_index, item_index - -# 基于余弦相似度计算用户相似性矩阵 -def compute_user_similarity(user_item_matrix): - similarity_matrix = cosine_similarity(user_item_matrix) - logging.info("User similarity matrix computed.") - return similarity_matrix - -# 基于相似用户为每个用户推荐物品 -def recommend_items(user_items, user_list, item_list, user_index, item_index, similarity_matrix, top_k=50): - recommendations = defaultdict(dict) - - for uid in user_list: - u_idx = user_index[uid] - similar_users = np.argsort(-similarity_matrix[u_idx])[:top_k] # 取前top_k个相似用户 - - # 遍历这些相似用户的物品,累积推荐得分 - item_scores = defaultdict(float) - for sim_uid_idx in similar_users: - if sim_uid_idx == u_idx: # 跳过自己 - continue - sim_uid = user_list[sim_uid_idx] - for item_id, score in user_items[sim_uid].items(): - if item_id not in user_items[uid]: # 只推荐未交互过的物品 - item_scores[item_id] += score * similarity_matrix[u_idx][sim_uid_idx] - - # 将得分最高的物品推荐给用户 - recom_list = {item_id: score for item_id, score in sorted(item_scores.items(), key=lambda x: -x[1])[:top_k]} - if len(recom_list) > least_items_size_to_output: - recommendations[uid] = recom_list - - logging.info("Recommendations computed for all users.") - return recommendations - -# 输出推荐结果 -def write_output(recommendations, output_file): - try: - with open(output_file, 'w') as f: - for uid, rec_items in recommendations.items(): - rec_str = ",".join([f"{item_id}:{score:.2f}" for item_id, score in rec_items.items()]) - f.write(f"{uid}\t{rec_str}\n") - logging.info(f"Recommendations written to {output_file}.") - except Exception as e: - logging.error(f"Error writing recommendations to {output_file}: {e}") - -def main(): - if len(sys.argv) != 3: - print("Usage: python recommend.py ") - logging.error("Invalid number of arguments. Expected 2 arguments: input_file and output_file.") - sys.exit(1) - - input_file = sys.argv[1] - output_file = sys.argv[2] - - logging.info(f"Starting recommendation process. Input file: {input_file}, Output file: {output_file}") - - # Step 1: 读取并合并输入 - user_items = read_input(input_file) - - if not user_items: - logging.error(f"No valid user-item data found in {input_file}. Exiting.") - sys.exit(1) - - # Step 2: 构建用户-物品矩阵 - user_item_matrix, user_list, item_list, user_index, item_index = build_user_item_matrix(user_items) - - # Step 3: 计算用户相似性 - similarity_matrix = compute_user_similarity(user_item_matrix) - - # Step 4: 为用户推荐物品 - recommendations = recommend_items(user_items, user_list, item_list, user_index, item_index, similarity_matrix, top_k) - - # Step 5: 输出推荐结果 - write_output(recommendations, output_file) - -if __name__ == '__main__': - main() diff --git a/collaboration/utils/utils.cc b/collaboration/utils/utils.cc deleted file mode 100644 index c3231b0..0000000 --- a/collaboration/utils/utils.cc +++ /dev/null @@ -1,55 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -//#include -#include - -#include -#include -void split(std::vector& tokens, const std::string& s, const std::string& delimiters = " ") -{ - using namespace std; - string::size_type lastPos = s.find_first_not_of(delimiters, 0); - string::size_type pos = s.find_first_of(delimiters, lastPos); - while (string::npos != pos || string::npos != lastPos) { - tokens.push_back(s.substr(lastPos, pos - lastPos));//use emplace_back after C++11 - lastPos = s.find_first_not_of(delimiters, pos); - pos = s.find_first_of(delimiters, lastPos); - } -} - -std::string currentTimetoStr(void) { - char tmp[64]; - time_t t = time(NULL); - tm *_tm = localtime(&t); - int year = _tm->tm_year+1900; - int month = _tm->tm_mon+1; - int date = _tm->tm_mday; - int hh = _tm->tm_hour; - int mm = _tm->tm_min; - int ss = _tm->tm_sec; - sprintf(tmp,"%04d-%02d-%02d %02d:%02d:%02d", year,month,date,hh,mm,ss); - return std::string(tmp); -} - - -bool compare_i2ulist_map_iters(const std::unordered_map >::const_iterator & a, const std::unordered_map >::const_iterator & b) { - // vector长的排序后面 - return a->second.size() < b->second.size(); -} - -bool compare_pairs(const std::pair & a, const std::pair & b) { - // 分数大的排前面 - return a.second > b.second; -} - diff --git a/offline_tasks/collaboration/.gitignore b/offline_tasks/collaboration/.gitignore new file mode 100644 index 0000000..259148f --- /dev/null +++ b/offline_tasks/collaboration/.gitignore @@ -0,0 +1,32 @@ +# Prerequisites +*.d + +# Compiled Object files +*.slo +*.lo +*.o +*.obj + +# Precompiled Headers +*.gch +*.pch + +# Compiled Dynamic libraries +*.so +*.dylib +*.dll + +# Fortran module files +*.mod +*.smod + +# Compiled Static libraries +*.lai +*.la +*.a +*.lib + +# Executables +*.exe +*.out +*.app diff --git a/offline_tasks/collaboration/Makefile b/offline_tasks/collaboration/Makefile new file mode 100644 index 0000000..d6122d1 --- /dev/null +++ b/offline_tasks/collaboration/Makefile @@ -0,0 +1,44 @@ +# Build targets + +USER_FLAGS = -Wno-unused-result -Wno-unused-but-set-variable -Wno-sign-compare -Wall +USER_LIBS = + +# Compiler flags +CXX = g++ -std=c++11 +CXXFLAGS = $(USER_FLAGS) -O3 -I ./include +LDFLAGS = -lpthread + +# The names of the executables that will be built +target_swing = bin/swing +target_icf_simple = bin/icf_simple +target_swing_symmetric = bin/swing_symmetric + +# Ensure the bin directory exists +BIN_DIR = bin + +# Declare phony targets +.PHONY: all clean + +# Build all targets +all: $(BIN_DIR) $(target_swing) $(target_icf_simple) $(target_swing_symmetric) + +# Create bin directory if it doesn't exist +$(BIN_DIR): + mkdir -p $(BIN_DIR) + +# Build target swing +$(target_swing): src/swing.cc utils/utils.cc include/* + $(CXX) $(LDFLAGS) -o $(target_swing) src/swing.cc utils/utils.cc $(CXXFLAGS) + +# Build target swing_1st_order +$(target_icf_simple): src/icf_simple.cc utils/utils.cc include/* + $(CXX) $(LDFLAGS) -o $(target_icf_simple) src/icf_simple.cc utils/utils.cc $(CXXFLAGS) + +# Build target swing_symmetric +$(target_swing_symmetric): src/swing_symmetric.cc utils/utils.cc include/* + $(CXX) $(LDFLAGS) -o $(target_swing_symmetric) src/swing_symmetric.cc utils/utils.cc $(CXXFLAGS) + +# Clean build files +clean: + rm -f $(target_swing) $(target_icf_simple) $(target_swing_symmetric) + find . -name '*.o' -delete diff --git a/offline_tasks/collaboration/README.md b/offline_tasks/collaboration/README.md new file mode 100644 index 0000000..f999279 --- /dev/null +++ b/offline_tasks/collaboration/README.md @@ -0,0 +1,17 @@ + +协同算法 + +item协同: +标准swing算法 +swing_symmetric.cc + +改进的(非对称)swing算法 +swing.cc + +简单的item协同: +icf_simple.cc + + +用户协同: +ucf.py + diff --git a/offline_tasks/collaboration/Swing快速开始.md b/offline_tasks/collaboration/Swing快速开始.md new file mode 100644 index 0000000..8efd347 --- /dev/null +++ b/offline_tasks/collaboration/Swing快速开始.md @@ -0,0 +1,70 @@ +# Swing算法快速开始 + +## 快速运行(3步) + +### 1. 生成Session文件 + +```bash +cd /home/tw/recommendation/offline_tasks +python3 scripts/generate_session.py --lookback_days 730 +``` + +这会生成: +- `output/session.txt.YYYYMMDD` - 标准格式(uid \t json) +- `output/session.txt.YYYYMMDD.cpp` - C++格式(纯json) + +### 2. 运行Swing算法 + +```bash +cd /home/tw/recommendation/collaboration +bash run.sh +``` + +### 3. 查看结果 + +```bash +# 查看可读版本(带商品名称) +cat output/swing_similar_readable.txt | head -20 + +# 或查看原始版本(仅ID) +cat output/swing_similar.txt | head -20 +``` + +## 输出文件 + +- `output_YYYYMMDD/swing_similar.txt` - Swing相似度结果(ID格式) +- `output_YYYYMMDD/swing_similar_readable.txt` - 可读版本(ID:名称格式) + +## 配置修改 + +如需调整参数,编辑 `run.sh`: + +```bash +# 数据路径 +SESSION_DATA_DIR="../offline_tasks/output" + +# Swing参数 +ALPHA=0.7 # 0.5-1.0,越小越关注用户共同行为 +THRESHOLD1=1 # 1-5,交互强度阈值 +THRESHOLD2=3 # 1-10,相似度计算阈值 +THREAD_NUM=4 # 线程数 +``` + +## 详细文档 + +查看完整文档:`../offline_tasks/SWING_USAGE.md` + +## 常见问题 + +**Q: 如何查看商品名称?** +A: 结果文件 `swing_similar_readable.txt` 已自动添加商品名称 + +**Q: 如何调整相似商品数量?** +A: 修改 `src/swing.cc` 中的 `max_sim_list_len`(默认300) + +**Q: Session文件找不到?** +A: 先运行步骤1生成session文件 + +**Q: 运行时间?** +A: 1万商品约1-5分钟,10万商品约10-30分钟 + diff --git a/offline_tasks/collaboration/bin/icf_simple b/offline_tasks/collaboration/bin/icf_simple new file mode 100755 index 0000000..3e2f6ca Binary files /dev/null and b/offline_tasks/collaboration/bin/icf_simple differ diff --git a/offline_tasks/collaboration/bin/swing b/offline_tasks/collaboration/bin/swing new file mode 100755 index 0000000..7e901cf Binary files /dev/null and b/offline_tasks/collaboration/bin/swing differ diff --git a/offline_tasks/collaboration/bin/swing_symmetric b/offline_tasks/collaboration/bin/swing_symmetric new file mode 100755 index 0000000..0ce2135 Binary files /dev/null and b/offline_tasks/collaboration/bin/swing_symmetric differ diff --git a/offline_tasks/collaboration/eval.py b/offline_tasks/collaboration/eval.py new file mode 100644 index 0000000..7cdcef1 --- /dev/null +++ b/offline_tasks/collaboration/eval.py @@ -0,0 +1,105 @@ +#!/home/SanJunipero/anaconda3/bin/python +# -*- coding:UTF-8 -*- +import os,sys,json,re,time +import numpy as np +import pandas as pd +from itertools import combinations +import logging +import traceback +import cgitb +from argparse import ArgumentParser + +sim_index = {} + +max_fea = 20 #最多用x个历史交互id去召回 +max_recall_len = 1200 + +def para_define(parser): + parser.add_argument('-s', '--sim_index', type=str, default='') + +def parse_sim_item_pair(x): + x = x.split(':') + return (int(x[0]), float(x[1])) + +def parse_session_item_pair(x): + x = x.split(':') + return (int(x[0][1:-1]), float(x[1])) + +def run_eval(FLAGS): + with open(FLAGS.sim_index) as f: + for line in f: + segs = line.rstrip().split('\t') + if len(segs) != 2: + continue + k, vlist = segs + sim_index[int(k)] = [parse_sim_item_pair(x) for x in vlist.split(',')] + + statis = [] + for line in sys.stdin: + line = line.strip() + segs = line.split('\t') + uid = segs[0] + session = segs[1][1:-1] + if not session: + continue + session_list = [parse_session_item_pair(x) for x in session.split(',')] + + score_list = {} + for item_id, wei in session_list[1:1+max_fea]: + for sim_item_id, sim_value in sim_index.get(item_id, []): + score_list.setdefault(sim_item_id, 0.0) + score_list[sim_item_id] += wei*sim_value + score_list.items() + sorted_score_list = sorted(score_list.items(), key = lambda k:k[1], reverse=True)[:max_recall_len] + + target_item_id = session_list[0][0] + hit_pos = -1 + for idx, (k, v) in enumerate(sorted_score_list): + if target_item_id == k: + hit_pos = idx + break + + if hit_pos == -1 or hit_pos > max_recall_len: + hit_pos = max_recall_len + info = (1, hit_pos, len(sorted_score_list), + int(hit_pos < 25), + int(hit_pos < 50), + int(hit_pos < 100), + int(hit_pos < 200), + int(hit_pos < 400), + int(hit_pos < 800), + int(hit_pos < max_recall_len), + ) + statis.append(info) + statis = np.array(statis) + + desc = '''(1, hit_pos, len(sorted_score_list), + int(hit_pos != -1 and hit_pos < 25), + int(hit_pos != -1 and hit_pos < 50), + int(hit_pos != -1 and hit_pos < 100), + int(hit_pos != -1 and hit_pos < 200), + int(hit_pos != -1 and hit_pos < 400), + int(hit_pos != -1 and hit_pos < 800), + int(hit_pos != -1), + )''' + print(desc) + + np.set_printoptions(suppress=True) + print(FLAGS.sim_index, 'mean', '\t'.join([str(x) for x in statis.mean(axis=0)]), sep='\t') + print(FLAGS.sim_index, 'sum', '\t'.join([str(x) for x in statis.sum(axis=0)]), sep='\t') + + + +def main(): + cgitb.enable(format='text') + # op config + parser = ArgumentParser() + para_define(parser) + + FLAGS, unparsed = parser.parse_known_args() + print(FLAGS) + + run_eval(FLAGS) + +if __name__ == "__main__": + main() diff --git a/offline_tasks/collaboration/include/BitMap.h b/offline_tasks/collaboration/include/BitMap.h new file mode 100644 index 0000000..8ba251a --- /dev/null +++ b/offline_tasks/collaboration/include/BitMap.h @@ -0,0 +1,45 @@ +#include +#include + +using namespace std; + +class BitMap +{ +public: + BitMap(size_t num) + { + _v.resize((num >> 5) + 1); + } + + void Set(size_t num) //set 1 + { + size_t index = num >> 5; + size_t pos = num & 0x1F; + _v[index] |= (1 << pos); + } + + void Reset(size_t num) //set 0 + { + size_t index = num >> 5; + size_t pos = num & 0x1F; + _v[index] &= ~(1 << pos); + } + + // + void ResetRoughly(size_t num) //set 0 + { + size_t index = num >> 5; + _v[index] = 0; + } + + bool Existed(size_t num)//check whether it exists + { + size_t index = num >> 5; + size_t pos = num & 0x1F; + return (_v[index] & (1 << pos)); + } + +private: + vector _v; +}; + diff --git a/offline_tasks/collaboration/include/utils.h b/offline_tasks/collaboration/include/utils.h new file mode 100644 index 0000000..b305172 --- /dev/null +++ b/offline_tasks/collaboration/include/utils.h @@ -0,0 +1,42 @@ +#ifndef ___HEADER_SWING_UTILS___ +#define ___HEADER_SWING_UTILS___ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +template +std::ostream& operator<< (std::ostream& out, const std::vector& v) { + if (!v.empty()) { + out << '['; + std::copy(v.begin(), v.end(), std::ostream_iterator(out, ", ")); + out << "\b\b]"; + } + return out; +} + +std::string currentTimetoStr(void); + +void split(std::vector& tokens, const std::string& s, const std::string& delimiters = " "); + +bool compare_pairs(const std::pair & a, const std::pair & b); + + +bool compare_i2ulist_map_iters(const std::unordered_map >::const_iterator & a, const std::unordered_map >::const_iterator & b); + + +#endif diff --git a/offline_tasks/collaboration/run.sh b/offline_tasks/collaboration/run.sh new file mode 100644 index 0000000..201eff1 --- /dev/null +++ b/offline_tasks/collaboration/run.sh @@ -0,0 +1,134 @@ +#!/bin/bash +source ~/.bash_profile + +# ============================================================================ +# 配置区域 - 可根据实际情况修改 +# ============================================================================ + +# 数据路径配置 +# 修改这个路径指向实际的session文件位置 +SESSION_DATA_DIR="../offline_tasks/output" + +# Swing算法参数 +ALPHA=0.7 # Swing算法的alpha参数 +THRESHOLD1=1 # 交互强度阈值1 +THRESHOLD2=3 # 交互强度阈值2 +THREAD_NUM=4 # 线程数 +SHOW_PROGRESS=1 # 是否显示进度 (0/1) + +# Python环境(如果需要特定的Python环境,在这里配置) +PYTHON_CMD="python3" + +# ============================================================================ +# 脚本执行区域 +# ============================================================================ + +# 编译C++程序 +echo "编译Swing程序..." +make +if [[ $? -ne 0 ]]; then + echo "编译失败,退出" + exit 1 +fi + +# 获取日期 +DAY=`date +"%Y%m%d"` +# 如果需要使用特定日期,取消下面的注释 +# DAY=20241017 + +echo "处理日期: ${DAY}" + +# 清理旧的输出目录(365天前)和日志(180天前) +find . -type d -name 'output_*' -ctime +365 -exec rm -rf {} \; 2>/dev/null +mkdir -p logs +find logs/ -type f -mtime +180 -exec rm -f {} \; 2>/dev/null + +# 创建输出目录 +output_dir=output_${DAY} +mkdir -p ${output_dir} + +# 确定session文件路径 +# 优先使用带日期的文件,如果不存在则使用.cpp格式的文件 +SESSION_FILE="${SESSION_DATA_DIR}/session.txt.${DAY}.cpp" +if [[ ! -f ${SESSION_FILE} ]]; then + SESSION_FILE="${SESSION_DATA_DIR}/session.txt.${DAY}" +fi + +if [[ ! -f ${SESSION_FILE} ]]; then + echo "错误: Session文件不存在: ${SESSION_FILE}" + echo "请先运行 generate_session.py 生成session文件" + exit 1 +fi + +echo "使用session文件: ${SESSION_FILE}" +echo "Swing参数: alpha=${ALPHA}, threshold1=${THRESHOLD1}, threshold2=${THRESHOLD2}, threads=${THREAD_NUM}" + +# 运行Swing算法 +# 如果session文件格式是 "uid \t json",需要用cut -f 2提取json部分 +# 如果session文件格式是纯json(每行一个),直接cat即可 +echo "开始运行Swing算法..." +if grep -q $'\t' ${SESSION_FILE}; then + # 包含tab,需要提取第二列 + echo "检测到session文件包含uid,提取json部分..." + cat ${SESSION_FILE} | cut -f 2 | bin/swing ${ALPHA} ${THRESHOLD1} ${THRESHOLD2} ${THREAD_NUM} ${output_dir} ${SHOW_PROGRESS} +else + # 纯json格式 + echo "检测到session文件为纯json格式..." + cat ${SESSION_FILE} | bin/swing ${ALPHA} ${THRESHOLD1} ${THRESHOLD2} ${THREAD_NUM} ${output_dir} ${SHOW_PROGRESS} +fi + +# 检查Swing算法是否成功执行 +if [[ $? -eq 0 ]]; then + echo "Swing算法执行成功" + + # 更新软链接指向最新输出 + if [[ -e output ]]; then + rm -rf output + fi + ln -s "${output_dir}" output + echo "软链接已更新为指向 ${output_dir}" + + # 合并结果文件 + echo "合并结果文件..." + cat output/sim_matrx.* > output/swing_similar.txt + echo "结果已合并到 output/swing_similar.txt" + + # 生成可读的debug文件(添加商品名称) + echo "生成可读的debug文件..." + DEBUG_SCRIPT="../offline_tasks/scripts/add_names_to_swing.py" + + if [[ -f ${DEBUG_SCRIPT} ]]; then + ${PYTHON_CMD} ${DEBUG_SCRIPT} output/swing_similar.txt output/swing_similar_readable.txt --debug + + if [[ $? -eq 0 ]]; then + echo "Debug文件已生成: output/swing_similar_readable.txt" + else + echo "警告: 生成debug文件失败,但Swing结果已保存" + fi + else + echo "警告: Debug脚本不存在: ${DEBUG_SCRIPT}" + echo "跳过生成可读文件" + fi + +else + echo "Swing算法执行失败,未更新软链接" + exit 1 +fi + +# ============================================================================ +# 用户协同过滤(UCF)- 可选 +# ============================================================================ + +# 如果需要运行UCF,取消下面的注释 +# echo "运行用户协同过滤..." +# # 仅使用最新的5万条数据 +# tail -n 50000 ${SESSION_FILE} > output/ucf.input +# python3 src/ucf.py output/ucf.input output/ucf.txt + +echo "全部完成!" +echo "结果文件:" +echo " - Swing相似度: ${output_dir}/swing_similar.txt" +echo " - Swing可读版: ${output_dir}/swing_similar_readable.txt" + + + diff --git a/offline_tasks/collaboration/src/icf_simple.cc b/offline_tasks/collaboration/src/icf_simple.cc new file mode 100644 index 0000000..e389af2 --- /dev/null +++ b/offline_tasks/collaboration/src/icf_simple.cc @@ -0,0 +1,170 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +//#include +#include +#include +#include +#include +#include +#include +#include "utils.h" + +int max_sim_list_len = 300; + +using namespace std; + +// 定义 64 位无符号整型 +typedef uint64_t ItemID; + +int main(int argc,char *argv[]) { + + float threshold1 = 0.5; + float threshold2 = 0.5; + int show_progress = 0; + + if (argc < 5) { + cout << "usage " << argv[0] << " threshold1 threshold2 show_progress(0/1)" << endl; + return -1; + } + + threshold1 = atof(argv[1]); + threshold2 = atof(argv[2]); + show_progress = atoi(argv[3]); + + cerr << currentTimetoStr() << " start... " << endl; + cerr << " threshold1 " << threshold1 << endl; + cerr << " threshold2 " << threshold2 << endl; + + // 一阶关系(DB簇索引) + unordered_map > sim_by_1rs_relation_map(1000000); + //sim_by_1rs_relation_map.reserve(1000000); + + string line_buff; + const string delimiters(","); + + vector field_segs; + vector > item_list; + + while (getline(cin, line_buff)) { + // 格式是一个json,所以要把开头和结尾的括号去掉 + line_buff.erase(0, line_buff.find_first_not_of("{")); + line_buff.erase(line_buff.find_last_not_of("}") + 1); + field_segs.clear(); + split(field_segs, line_buff, delimiters); + + item_list.clear(); + for (size_t i = 0; i < field_segs.size(); i++) { + const char * seg_pos = strchr(field_segs[i].c_str(), ':'); + if (seg_pos == NULL || (seg_pos - field_segs[i].c_str() >= field_segs[i].length())) break; + + float value = atof(seg_pos + 1); + if (value > threshold1 || value > threshold2) { + // 开头有一个双引号 + ItemID item_id = static_cast(strtoull(field_segs[i].c_str() + 1, NULL, 10)); + item_list.push_back(make_pair(item_id, value)); + } + } + + if (item_list.size() < 2) continue; + + // append本次的itemlist + ItemID map_key = 0; + ItemID map_key_1 = 0; + ItemID map_key_2 = 0; + pair >::iterator, bool> ins_ret; + + for (vector >::const_iterator i = item_list.begin(); i != item_list.end(); ++i) { + map_key_1 = i->first; + for (vector >::const_iterator j = item_list.begin(); j != item_list.end(); ++j) { + map_key_2 = j->first; + + if (map_key_1 == map_key_2) continue; + + if (i->second > threshold1 && j->second > threshold2) { + map_key = (map_key_1 << 32) + map_key_2; + ins_ret = sim_by_1rs_relation_map.insert(make_pair(map_key, make_pair(1, j->second))); + if (!ins_ret.second) { + ins_ret.first->second.first += 1; + ins_ret.first->second.second += j->second; + } + } + if (j->second > threshold1 && i->second > threshold2) { + map_key = (map_key_2 << 32) + map_key_1; + ins_ret = sim_by_1rs_relation_map.insert(make_pair(map_key, make_pair(1, i->second))); + if (!ins_ret.second) { + ins_ret.first->second.first += 1; + ins_ret.first->second.second += i->second; + } + } + } + } + } + + unordered_map > > sim_matrix(200000); + // 计算item_i, item_j合并的打分,total_wei / num * math.log(1.5*num, 1.5). + pair > > pair_entry; + pair > >::iterator, bool> ins_ret; + + for (unordered_map >::iterator iter = sim_by_1rs_relation_map.begin(); iter != sim_by_1rs_relation_map.end(); ++iter) { + ItemID item1 = iter->first >> 32; + ItemID item2 = iter->first & 0xFFFFFFFF; + + int num = iter->second.first; + float total_wei = iter->second.second; + float merged_score = total_wei / num * log(1.5 * num); + + pair_entry.first = item1; + + ins_ret = sim_matrix.insert(pair_entry); + ins_ret.first->second.push_back(make_pair(item2, merged_score)); + } + + // staits info of sim matrix + vector sim_list_len_statis; + sim_list_len_statis.resize(max_sim_list_len + 1); + + // write sim matrix + for (unordered_map > >::iterator iter = sim_matrix.begin(); iter != sim_matrix.end(); ++iter) { + vector > & sim_list_buff = iter->second; + int sim_list_len = sim_list_buff.size(); + if (sim_list_len > 0) { + sort(sim_list_buff.begin(), sim_list_buff.end(), compare_pairs); + + cout << iter->first << "\t" << sim_list_buff[0].first << ":" << sim_list_buff[0].second; + + if (sim_list_len > max_sim_list_len) sim_list_len = max_sim_list_len; + + sim_list_len_statis[sim_list_len] += 1; + + for (int i = 1; i < sim_list_len; i++) { + cout << ',' << sim_list_buff[i].first << ':' << sim_list_buff[i].second; + } + cout << endl; + } + } + + // staits info of sim matrix + int sum_groups = accumulate(sim_list_len_statis.begin(), sim_list_len_statis.end(), (int)0); + cerr << currentTimetoStr() << " write sim matrix finished" << endl; + cerr << currentTimetoStr() << " print staits info of sim matrix... " << sim_list_len_statis.size() << endl; + cerr << currentTimetoStr() << " total keys: " << sum_groups << endl; + + int accumulate = 0; + for (int i = sim_list_len_statis.size() - 1; i > -1; i--) { + accumulate += sim_list_len_statis[i]; + fprintf(stderr, "simlist_len %4d, num %4d, accumulate %6d accumulated_rate %5.2f%\%\n", (int)i, sim_list_len_statis[i], accumulate, 100.0 * accumulate / sum_groups); + } + + return 0; +} diff --git a/offline_tasks/collaboration/src/swing.cc b/offline_tasks/collaboration/src/swing.cc new file mode 100644 index 0000000..6666c8c --- /dev/null +++ b/offline_tasks/collaboration/src/swing.cc @@ -0,0 +1,409 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "utils.h" +#include "BitMap.h" + +using namespace std; + +// 使用 typedef 定义 itemID 类型 +typedef unsigned long long itemID; + +class Config { +public: + Config() { + + user_sessions_num = 2000000; + items_num = 160000; + + max_sim_list_len = 300; + max_session_list_len = 100; + + threshold1 = 0.5; + threshold2 = 0.5; + alpha = 0.5; + thread_num = 20; + show_progress = 0; + output_path = "result"; + } + + int load(int argc,char *argv[]) { + if (argc < 7) { + cout << "usage " << argv[0] << " alpha threshold1 threshold2 thread_num output_path show_progress(0/1) " << endl; + return -1; + } + + alpha = atof(argv[1]); + threshold1 = atof(argv[2]); + threshold2 = atof(argv[3]); + + thread_num = atoi(argv[4]); + output_path = argv[5]; + show_progress = atoi(argv[6]); + + cout << currentTimetoStr() << " start... " << endl; + cout << " threshold1 " << threshold1 << endl; + cout << " threshold2 " << threshold2 << endl; + cout << " alpha " << alpha << endl; + return 0; + } + +public: + int user_sessions_num; + int items_num; + + int max_sim_list_len; // 输出相似itemlist 最大长度 + int max_session_list_len; // 输入的 用户行为列表,截断长度(按权重排序阶段) + float threshold1; + float threshold2; + float alpha; + float thread_num; + int show_progress; + string output_path; +}; + +/** + * + * read data from stdin + * format: + * 输入的itemlist必须是按照权重排序的 + * + * {"111":3.9332,"222":0.0382,"333":0.0376} + * {"444":13.2136,"555":2.1438,"666":1.3443,"777":0.6775} + * {"888":22.0632,"999":0.0016} + * + * parm : + * config + * groups : index of user_id -> items + * i2u_map : index of item -> users + */ +int load_data(const Config & config, + vector< pair , vector > > & groups, + unordered_map, vector > > & i2u_map) { + + string line_buff; + + const string delimiters(","); + + vector field_segs; + // 每个元素是一个user的两个itemlist,first是交互强度大于threshold1的itemList,后者是强度大于threshold2的itemList + pair , vector > itemlist_pair; + + + pair , vector > > pair_entry; + pair , vector > >::iterator, bool> ins_i2u_ret; + + while (getline(cin, line_buff)) { + //格式是一个json,所以要把开头和结尾的括号去掉 + line_buff.erase(0,line_buff.find_first_not_of("{")); + line_buff.erase(line_buff.find_last_not_of("}") + 1); + //cout << line_buff << " !!!" << endl; + field_segs.clear(); + split(field_segs, line_buff, delimiters); + if (field_segs.size() < config.max_session_list_len) { + field_segs.resize(config.max_session_list_len); + } + + // field_segs是按权重有序的,进行截断 + + for (size_t i = 0; i < field_segs.size(); i++) { + const char * seg_pos = strchr(field_segs[i].c_str(), ':') ; + if (seg_pos == NULL || (seg_pos - field_segs[i].c_str() >= field_segs[i].length())) break; + + float value = atof(seg_pos + 1); + if (value < config.threshold1 && value < config.threshold2) break; + + // 开头有一个双引号 + itemID item_id = strtoull(field_segs[i].c_str() + 1, nullptr, 10); + if (value > config.threshold1) { + itemlist_pair.first.push_back(item_id); + } + if (value > config.threshold2) { + itemlist_pair.second.push_back(item_id); + } + } + + // 左侧必须有2个item,右侧必须有1个item,此时该用户才有可能给(item_i, item_j) 打分 + if (!(itemlist_pair.first.size() > 1 && itemlist_pair.second.size() > 0)) { + itemlist_pair.first.clear(); + itemlist_pair.second.clear(); + continue; + } + // 排序 + sort(itemlist_pair.first.begin(), itemlist_pair.first.end()); + sort(itemlist_pair.second.begin(), itemlist_pair.second.end()); + + // 合入i2u索引 + int idx = groups.size(); //待插入的index + for (auto item_id : itemlist_pair.first) { + pair_entry.first = item_id; + ins_i2u_ret = i2u_map.insert(pair_entry); + ins_i2u_ret.first->second.first.push_back(idx); + } + for (auto item_id : itemlist_pair.second) { + pair_entry.first = item_id; + ins_i2u_ret = i2u_map.insert(pair_entry); + ins_i2u_ret.first->second.second.push_back(idx); + } + + // 插入 u -> item_list索引 + groups.resize(groups.size()+1); + groups.back().first.swap(itemlist_pair.first); + groups.back().second.swap(itemlist_pair.second); + + } + + cout << currentTimetoStr() << " items num: " << i2u_map.size() << endl; + cout << currentTimetoStr() << " users num: " << groups.size() << endl; + cout << currentTimetoStr() << " sort.." << endl; + + for (auto iter : i2u_map) { + sort(iter.second.first.begin(), iter.second.first.end()); + sort(iter.second.second.begin(), iter.second.second.end()); + } + cout << currentTimetoStr() << " sort finished" << endl; + return 0; + +} + + +struct TaskOutput { + int id; + string output_path; + vector sim_list_len_statis; +}; + + +/* + * input parm: + * groups : u -> i index + * i2u_map : i -> u index + * output_path : path of write sim matrix + * + * output param: + * out + * + */ +int calc_sim_matrix(const Config & config, + const vector< pair , vector > > & groups, + const unordered_map, vector > > & i2u_map, + TaskOutput & out, + int task_id, int total_tasks +) { + + int users_num = groups.size(); + int items_num = i2u_map.size(); + if (items_num < 2) return -1; + + ofstream out_file(out.output_path); + if (out_file.fail()) { + cerr << currentTimetoStr() << " create out_file err: " << out.output_path << endl; + return -1; + } + + vector users_intersection_buffer; + vector items_intersection_buffer; + vector > sim_list_buff; + users_intersection_buffer.reserve(2048); + BitMap user_bm(users_num); + bool use_bitmap; + + out.sim_list_len_statis.resize(config.max_sim_list_len+1); + + int idx = 0; + for (auto & iter_i : i2u_map) { + // if ((idx++) % total_tasks != task_id) continue; + // 改进任务分配策略,避免不同线程计算相同的 itemID。上面是基于索引 idx 分配任务 + // 基于 itemID 的值进行分配,避免相同的 itemID 被多个线程处理。 + if (iter_i.first % total_tasks != task_id) continue; + + const vector & ulist_of_item_i = iter_i.second.first; + if (config.show_progress) { + fprintf(stdout, "\r%d of %d", idx++, items_num); + } + sim_list_buff.clear(); + + //use_bitmap = true; + use_bitmap = ulist_of_item_i.size() > 50; + /** + * 由全部使用有序数组求交,改为 长用bitmap,短的遍历,时长由 30 分钟 提升到 12分钟(users num 100w+) + * // bitmapsize长度(users num)100万+的情况下,这个阈值选取0(即全部使用bitmap),50和100,时长都差不多。但是还是保留这个逻辑,单user_list长度达到千万时,这里根据阈值做区分对待应该还是有必要 + */ + if (use_bitmap) { + for (auto user_id : ulist_of_item_i) { + user_bm.Set(user_id); + } + } + + for (auto & iter_j : i2u_map) { + if (iter_j.first == iter_i.first) continue; + + const vector & ulist_of_item_j = iter_j.second.second; + users_intersection_buffer.clear(); + // 交互过item_i, item_j的user_list + if (use_bitmap) { + for (auto user_id : ulist_of_item_j) { + if (user_bm.Existed(user_id)) { + users_intersection_buffer.push_back(user_id); + } + } + } else { + set_intersection(ulist_of_item_i.begin(), ulist_of_item_i.end(), ulist_of_item_j.begin(), ulist_of_item_j.end(), back_inserter(users_intersection_buffer)); + } + + if (users_intersection_buffer.size() < 2) continue; + // user_i, user_j + + float sim_of_item_i_j = 0.0; + // 遍历共同交互过(item_i, item_j)的user组合(user_i, user_j) + for (vector::const_iterator user_i = users_intersection_buffer.begin() + 1; + user_i != users_intersection_buffer.end(); + ++user_i) { + + const vector & item_list_of_user_i = groups[*user_i].first; // 使用first + for (vector::const_iterator user_j = users_intersection_buffer.begin(); + user_j != user_i; + ++user_j) { + + const vector & item_list_of_user_j = groups[*user_j].first; // 使用first + items_intersection_buffer.clear(); + + // 求交集 + set_intersection(item_list_of_user_i.begin(), item_list_of_user_i.end(), + item_list_of_user_j.begin(), item_list_of_user_j.end(), + back_inserter(items_intersection_buffer)); + + sim_of_item_i_j += 1.0 / (config.alpha + items_intersection_buffer.size()); + } + } + sim_list_buff.push_back(make_pair(iter_j.first, sim_of_item_i_j)); + } + + if (use_bitmap) { + for (auto user_id : ulist_of_item_i) { + user_bm.ResetRoughly(user_id); + } + } + + int sim_list_len = sim_list_buff.size(); + if (sim_list_len > 0) { + + sort(sim_list_buff.begin(), sim_list_buff.end(), compare_pairs); + + out_file << iter_i.first << "\t" << sim_list_buff[0].first << ":" << sim_list_buff[0].second; + + if (sim_list_len > config.max_sim_list_len) sim_list_len = config.max_sim_list_len; + + out.sim_list_len_statis[sim_list_len] += 1; + + for (int i = 1; i < sim_list_len; i++) { + out_file << ',' << sim_list_buff[i].first << ':' << sim_list_buff[i].second; + } + out_file << endl; + } + + } + + out_file.close(); + return 0; +} + +void printSimMatrixStatisInfo(string task_name, const vector & sim_list_len_statis) { + // staits info of sim matrix + int sum_groups = accumulate(sim_list_len_statis.begin(), sim_list_len_statis.end(), (int)0); + cout << currentTimetoStr() << " ========== TASK STATIS INFO [" << task_name << "]==========" << endl; + cout << currentTimetoStr() << " write sim matrix finished" << endl; + cout << currentTimetoStr() << " print staits info of sim matrix... " << sim_list_len_statis.size() << endl; + cout << currentTimetoStr() << " total keys: " << sum_groups << endl; + + int accumulate = 0; + for (int i = sim_list_len_statis.size() - 1; i >= 0; i--) { + accumulate += sim_list_len_statis[i]; + if (i % 20 == 0) { + // 注意 为防止输出太多,间隔20输出一行,所以num与上一行的累加不会等于accumulate + fprintf(stdout, "simlist_len %4d, num %4d, accumulate %6d accumulated_rate %5.2f%\%\n", + (int) i, sim_list_len_statis[i], accumulate, 100.0 * accumulate / sum_groups); + } + } +} + +int main(int argc,char *argv[]) { + + Config config; + int ret = config.load(argc, argv); + if (ret < 0) { + cerr << currentTimetoStr() << " load_config err: " << ret << endl; + return ret; + } + + cout << currentTimetoStr() << " start load raw user_session data ... " << endl; + + vector< pair , vector > > groups; + groups.reserve(config.user_sessions_num); + + unordered_map, vector > > i2u_map; + i2u_map.reserve(config.items_num); + + ret = load_data(config, groups, i2u_map); + if (ret < 0) { + cerr << currentTimetoStr() << " load_data err: " << ret << endl; + return ret; + } + cout << currentTimetoStr() << " load raw user_session data finished. " << endl; + + vector outs; + outs.resize(config.thread_num); + + vector threads; + char out_path[256]; + for (int task_id = 0; task_id < config.thread_num; task_id++) { + outs[task_id].id = task_id; + + snprintf(out_path, sizeof(out_path), "%s/sim_matrx.%0.1f_%0.3f_%0.3f.%d", config.output_path.c_str(), config.alpha, config.threshold1, config.threshold2, task_id); + outs[task_id].output_path = out_path; + threads.push_back(thread(calc_sim_matrix, std::cref(config), std::cref(groups), std::cref(i2u_map), std::ref(outs[task_id]), task_id, config.thread_num)); + } + + // wait all tasks + cout << endl; + cout << currentTimetoStr() << " wait sim_calc threads ... " << endl; + std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join)); + cout << currentTimetoStr() << " all sim_calc tasks finished" << endl; + + // merge outputs + TaskOutput merged_output; + vector & sim_list_len_statis = merged_output.sim_list_len_statis; + for (auto & out_task_i : outs) { + string task_name = std::to_string(out_task_i.id) + " " + out_task_i.output_path; + printSimMatrixStatisInfo(task_name, out_task_i.sim_list_len_statis); + + vector & list_i = out_task_i.sim_list_len_statis; + if (sim_list_len_statis.size() < list_i.size()) { + sim_list_len_statis.resize(list_i.size()); + } + for (size_t j = 0; j < list_i.size(); j++) { + sim_list_len_statis[j] += list_i[j]; + } + } + + printSimMatrixStatisInfo("Merged", sim_list_len_statis); + + return 0; +} diff --git a/offline_tasks/collaboration/src/swing_symmetric.cc b/offline_tasks/collaboration/src/swing_symmetric.cc new file mode 100644 index 0000000..721a031 --- /dev/null +++ b/offline_tasks/collaboration/src/swing_symmetric.cc @@ -0,0 +1,234 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "utils.h" +#include "BitMap.h" + +int max_sim_list_len = 300; + +using namespace std; + +typedef unsigned long long item_id_t; // 定义64位无符号整型作为item ID + +// 比较函数,用于排序时按item_id_t来比较 +bool compare_i2ulist_map_iters2(const unordered_map>::const_iterator &a, + const unordered_map>::const_iterator &b) { + return a->first < b->first; +} + +// 比较函数,用于sim_list排序 +bool compare_pairs2(const pair &a, const pair &b) { + return a.second > b.second; +} + +int main(int argc, char *argv[]) { + + float alpha = 0.5; + float threshold = 0.5; + int show_progress = 0; + + if (argc < 4) { + cerr << "usage " << argv[0] << " alpha threshold show_progress(0/1)" << endl; + return -1; + } + + alpha = atof(argv[1]); + threshold = atof(argv[2]); + show_progress = atoi(argv[3]); + + cerr << currentTimetoStr() << " start... " << endl; + cerr << " alpha " << alpha << endl; + cerr << " threshold " << threshold << endl; + + unordered_map> i2u_map; + i2u_map.reserve(160000); + + string line_buff; + const string delimiters(","); + + vector field_segs; + vector> groups; // Changed to store item_id_t + groups.reserve(2000000); + vector item_list; + + vector items_intersection_buffer; + vector users_intersection_buffer; + users_intersection_buffer.reserve(2000); + + pair> pair_entry; + pair>::iterator, bool> ins_i2u_ret; + + while (getline(cin, line_buff)) { + // 格式是一个json,所以要把开头和结尾的括号去掉 + line_buff.erase(0, line_buff.find_first_not_of("{")); + line_buff.erase(line_buff.find_last_not_of("}") + 1); + field_segs.clear(); + split(field_segs, line_buff, delimiters); + + item_list.clear(); + for (size_t i = 0; i < field_segs.size(); i++) { + const char *seg_pos = strchr(field_segs[i].c_str(), ':'); + if (seg_pos == NULL || (seg_pos - field_segs[i].c_str() >= field_segs[i].length())) break; + + float value = atof(seg_pos + 1); + if (value > threshold) { + // 开头有一个双引号 + item_id_t item_id = strtoull(field_segs[i].c_str() + 1, NULL, 10); + item_list.push_back(item_id); + } + } + + if (item_list.size() < 2) continue; + // 排序 + sort(item_list.begin(), item_list.end()); + + // append本次的itemlist + int idx = groups.size(); + groups.push_back(item_list); // item_list is now of type item_id_t + // 合入i2u索引 + for (vector::const_iterator iter = item_list.begin(); iter != item_list.end(); ++iter) { + pair_entry.first = *iter; + ins_i2u_ret = i2u_map.insert(pair_entry); + ins_i2u_ret.first->second.push_back(idx); + } + } + + int items_num = i2u_map.size(); + int users_num = groups.size(); + cerr << currentTimetoStr() << " items num: " << i2u_map.size() << endl; + cerr << currentTimetoStr() << " users num: " << groups.size() << endl; + cerr << currentTimetoStr() << " sort.." << endl; + + vector>::const_iterator> sorted_i_ulist_pairs; + + for (unordered_map>::iterator iter = i2u_map.begin(); iter != i2u_map.end(); ++iter) { + sorted_i_ulist_pairs.push_back(iter); + sort(iter->second.begin(), iter->second.end()); + } + cerr << currentTimetoStr() << " sort finished" << endl; + + sort(sorted_i_ulist_pairs.begin(), sorted_i_ulist_pairs.end(), compare_i2ulist_map_iters2); + + if (items_num < 2) return -1; + + vector> sim_list_buff; + unordered_map>> sim_matrix; + sim_matrix.reserve(items_num); + + int idx = 0; + + BitMap user_bm(users_num); + bool use_bitmap; + vector sim_list_len_statis; + sim_list_len_statis.resize(max_sim_list_len + 1); + + for (int i = 1; i < sorted_i_ulist_pairs.size(); ++i) { + unordered_map>::const_iterator pair_i = sorted_i_ulist_pairs[i]; + if (show_progress) { + fprintf(stderr, "\r%d of %d", idx++, items_num); + } + sim_list_buff.clear(); + + use_bitmap = pair_i->second.size() > 50; + + if (use_bitmap) { + for (vector::const_iterator iter_pair_i = pair_i->second.begin(); iter_pair_i != pair_i->second.end(); ++iter_pair_i) { + user_bm.Set(*iter_pair_i); + } + } + + for (int j = 0; j < i; ++j) { + unordered_map>::const_iterator pair_j = sorted_i_ulist_pairs[j]; + users_intersection_buffer.clear(); + + if (use_bitmap) { + for (vector::const_iterator iter_pair_j = pair_j->second.begin(); iter_pair_j != pair_j->second.end(); ++iter_pair_j) { + if (user_bm.Existed(*iter_pair_j)) { + users_intersection_buffer.push_back(*iter_pair_j); + } + } + } else { + set_intersection(pair_i->second.begin(), pair_i->second.end(), pair_j->second.begin(), pair_j->second.end(), back_inserter(users_intersection_buffer)); + } + + if (users_intersection_buffer.size() < 2) continue; + + float sim_of_item_i_j = 0.0; + for (vector::const_iterator user_i = users_intersection_buffer.begin() + 1; + user_i != users_intersection_buffer.end(); + ++user_i) { + + const vector &item_list_of_user_i = groups[*user_i]; + + for (vector::const_iterator user_j = users_intersection_buffer.begin(); + user_j != user_i; + ++user_j) { + + const vector &item_list_of_user_j = groups[*user_j]; + items_intersection_buffer.clear(); + set_intersection(item_list_of_user_i.begin(), item_list_of_user_i.end(), item_list_of_user_j.begin(), item_list_of_user_j.end(), back_inserter(items_intersection_buffer)); + + sim_of_item_i_j += 1.0 / (alpha + items_intersection_buffer.size()); + } + } + sim_list_buff.push_back(make_pair(pair_j->first, sim_of_item_i_j)); + } + + sim_matrix[pair_i->first] = sim_list_buff; + for (auto &p : sim_list_buff) { + sim_matrix[p.first].push_back(make_pair(pair_i->first, p.second)); + } + if (use_bitmap) { + for (vector::const_iterator iter_pair_i = pair_i->second.begin(); iter_pair_i != pair_i->second.end(); ++iter_pair_i) { + user_bm.ResetRoughly(*iter_pair_i); + } + } + } + + for (auto &p : sim_matrix) { + vector> &sim_list = p.second; + int sim_list_len = p.second.size(); + if (sim_list_len > 0) { + sort(sim_list.begin(), sim_list.end(), compare_pairs2); + + cout << p.first << "\t" << sim_list[0].first << ":" << sim_list[0].second; + + if (sim_list_len > max_sim_list_len) { + sim_list_len = max_sim_list_len; + } + + sim_list_len_statis[sim_list_len] += 1; + + for (int i = 1; i < sim_list_len; i++) { + cout << ',' << sim_list[i].first << ':' << sim_list[i].second; + } + cout << endl; + } + } + + int sum_groups = accumulate(sim_list_len_statis.begin(), sim_list_len_statis.end(), 0); + cerr << currentTimetoStr() << " write sim matrix finished" << endl; + cerr << currentTimetoStr() << " print stats info of sim matrix... " << sim_list_len_statis.size() << endl; + cerr << currentTimetoStr() << " total keys: " << sum_groups << endl; + + int accumulate = 0; + for (int i = sim_list_len_statis.size() - 1; i >= 0; i--) { + accumulate += sim_list_len_statis[i]; + fprintf(stderr, "simlist_len %4d, num %4d, accumulate %6d accumulated_rate %5.2f%%\n", + i, sim_list_len_statis[i], accumulate, 100.0 * accumulate / sum_groups); + } + + return 0; +} diff --git a/offline_tasks/collaboration/src/ucf.py b/offline_tasks/collaboration/src/ucf.py new file mode 100644 index 0000000..db93dbf --- /dev/null +++ b/offline_tasks/collaboration/src/ucf.py @@ -0,0 +1,145 @@ +import sys +import json +import logging +from collections import defaultdict +from sklearn.metrics.pairwise import cosine_similarity +import numpy as np + +# 日志配置 +logging.basicConfig(filename='logs/ucf.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +# 输入数据,用户对item的评分 +# 暂定为0.0,也就是所有的行为都会进行考虑。如果要过滤掉只有一次点击的,可以设定为1.1,1分是一次点击,有点击阅读页或者多次点击就会达到2分以上 +user_rating_threshold = 0.0 +# 当某个用于基于最近邻推荐输出的item list低于多少时不输出 +least_items_size_to_output = 5 +# 每个用户输出的top_k +top_k = 50 + +# 本模块的主要特点: +# 读取数据并合并同一个用户的多行记录:同一个用户可能会出现在多行中,对同一个用户的多行记录进行了合并。 +# 计算用户之间的相似性:用户协同过滤的关键是计算用户之间的相似度。为了加速计算,可以使用基于向量化的余弦相似度,而避免直接计算两两用户之间的相似度。 +# 为每个用户推荐物品:根据相似用户的评分,为每个用户推荐新的物品,并计算推荐得分。 + + +# 读取数据,并合并同一个用户的多行记录 +def read_input(input_file): + user_items = defaultdict(dict) + + with open(input_file, 'r') as f: + for line_num, line in enumerate(f, 1): + try: + uid, items_str = line.strip().split('\t') + items = json.loads(items_str) + for item_id, score in items.items(): + if score < user_rating_threshold: + continue + if item_id in user_items[uid]: + user_items[uid][item_id] += score # 合并相同用户的评分 + else: + user_items[uid][item_id] = score + except ValueError as ve: + logging.error(f"Data format error at line {line_num}: {line.strip()}. Error: {ve}") + except json.JSONDecodeError as je: + logging.error(f"JSON parse error at line {line_num}: {line.strip()}. Error: {je}") + + logging.info(f"Input data loaded from {input_file}. Total users: {len(user_items)}") + return user_items + +# 基于物品评分构建用户-物品矩阵 +def build_user_item_matrix(user_items): + all_items = set() + for items in user_items.values(): + all_items.update(items.keys()) + + item_list = list(all_items) + item_index = {item_id: idx for idx, item_id in enumerate(item_list)} + + user_list = list(user_items.keys()) + user_index = {uid: idx for idx, uid in enumerate(user_list)} + + user_item_matrix = np.zeros((len(user_list), len(item_list))) + + for uid, items in user_items.items(): + for item_id, score in items.items(): + user_item_matrix[user_index[uid]][item_index[item_id]] = score + + logging.info(f"User-item matrix built with shape: {user_item_matrix.shape}") + + return user_item_matrix, user_list, item_list, user_index, item_index + +# 基于余弦相似度计算用户相似性矩阵 +def compute_user_similarity(user_item_matrix): + similarity_matrix = cosine_similarity(user_item_matrix) + logging.info("User similarity matrix computed.") + return similarity_matrix + +# 基于相似用户为每个用户推荐物品 +def recommend_items(user_items, user_list, item_list, user_index, item_index, similarity_matrix, top_k=50): + recommendations = defaultdict(dict) + + for uid in user_list: + u_idx = user_index[uid] + similar_users = np.argsort(-similarity_matrix[u_idx])[:top_k] # 取前top_k个相似用户 + + # 遍历这些相似用户的物品,累积推荐得分 + item_scores = defaultdict(float) + for sim_uid_idx in similar_users: + if sim_uid_idx == u_idx: # 跳过自己 + continue + sim_uid = user_list[sim_uid_idx] + for item_id, score in user_items[sim_uid].items(): + if item_id not in user_items[uid]: # 只推荐未交互过的物品 + item_scores[item_id] += score * similarity_matrix[u_idx][sim_uid_idx] + + # 将得分最高的物品推荐给用户 + recom_list = {item_id: score for item_id, score in sorted(item_scores.items(), key=lambda x: -x[1])[:top_k]} + if len(recom_list) > least_items_size_to_output: + recommendations[uid] = recom_list + + logging.info("Recommendations computed for all users.") + return recommendations + +# 输出推荐结果 +def write_output(recommendations, output_file): + try: + with open(output_file, 'w') as f: + for uid, rec_items in recommendations.items(): + rec_str = ",".join([f"{item_id}:{score:.2f}" for item_id, score in rec_items.items()]) + f.write(f"{uid}\t{rec_str}\n") + logging.info(f"Recommendations written to {output_file}.") + except Exception as e: + logging.error(f"Error writing recommendations to {output_file}: {e}") + +def main(): + if len(sys.argv) != 3: + print("Usage: python recommend.py ") + logging.error("Invalid number of arguments. Expected 2 arguments: input_file and output_file.") + sys.exit(1) + + input_file = sys.argv[1] + output_file = sys.argv[2] + + logging.info(f"Starting recommendation process. Input file: {input_file}, Output file: {output_file}") + + # Step 1: 读取并合并输入 + user_items = read_input(input_file) + + if not user_items: + logging.error(f"No valid user-item data found in {input_file}. Exiting.") + sys.exit(1) + + # Step 2: 构建用户-物品矩阵 + user_item_matrix, user_list, item_list, user_index, item_index = build_user_item_matrix(user_items) + + # Step 3: 计算用户相似性 + similarity_matrix = compute_user_similarity(user_item_matrix) + + # Step 4: 为用户推荐物品 + recommendations = recommend_items(user_items, user_list, item_list, user_index, item_index, similarity_matrix, top_k) + + # Step 5: 输出推荐结果 + write_output(recommendations, output_file) + +if __name__ == '__main__': + main() diff --git a/offline_tasks/collaboration/utils/utils.cc b/offline_tasks/collaboration/utils/utils.cc new file mode 100644 index 0000000..c3231b0 --- /dev/null +++ b/offline_tasks/collaboration/utils/utils.cc @@ -0,0 +1,55 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +//#include +#include + +#include +#include +void split(std::vector& tokens, const std::string& s, const std::string& delimiters = " ") +{ + using namespace std; + string::size_type lastPos = s.find_first_not_of(delimiters, 0); + string::size_type pos = s.find_first_of(delimiters, lastPos); + while (string::npos != pos || string::npos != lastPos) { + tokens.push_back(s.substr(lastPos, pos - lastPos));//use emplace_back after C++11 + lastPos = s.find_first_not_of(delimiters, pos); + pos = s.find_first_of(delimiters, lastPos); + } +} + +std::string currentTimetoStr(void) { + char tmp[64]; + time_t t = time(NULL); + tm *_tm = localtime(&t); + int year = _tm->tm_year+1900; + int month = _tm->tm_mon+1; + int date = _tm->tm_mday; + int hh = _tm->tm_hour; + int mm = _tm->tm_min; + int ss = _tm->tm_sec; + sprintf(tmp,"%04d-%02d-%02d %02d:%02d:%02d", year,month,date,hh,mm,ss); + return std::string(tmp); +} + + +bool compare_i2ulist_map_iters(const std::unordered_map >::const_iterator & a, const std::unordered_map >::const_iterator & b) { + // vector长的排序后面 + return a->second.size() < b->second.size(); +} + +bool compare_pairs(const std::pair & a, const std::pair & b) { + // 分数大的排前面 + return a.second > b.second; +} + diff --git a/offline_tasks/doc/Redis数据规范.md b/offline_tasks/doc/Redis数据规范.md index f8c500e..46c1211 100644 --- a/offline_tasks/doc/Redis数据规范.md +++ b/offline_tasks/doc/Redis数据规范.md @@ -294,15 +294,21 @@ def load_interest_index(file_path, list_type, redis_client, expire_seconds=25920 ```bash cd /home/tw/recommendation/offline_tasks -# 加载所有索引(使用今天的数据) +# 加载所有索引(使用今天的数据,包括C++ Swing) python3 scripts/load_index_to_redis.py --redis-host localhost --redis-port 6379 # 加载指定日期的索引 python3 scripts/load_index_to_redis.py --date 20251016 --redis-host localhost -# 只加载i2i索引 +# 只加载i2i索引(包括C++ Swing) python3 scripts/load_index_to_redis.py --load-i2i --redis-host localhost +# 只加载C++ Swing索引 +python3 scripts/load_index_to_redis.py \ + --file ../collaboration/output/swing_similar.txt \ + --algorithm swing_cpp \ + --redis-host localhost + # 只加载兴趣聚合索引 python3 scripts/load_index_to_redis.py --load-interest --redis-host localhost ``` @@ -315,7 +321,10 @@ redis-cli # 检查key数量 DBSIZE -# 查看某个商品的相似推荐 +# 查看某个商品的相似推荐(C++ Swing) +GET item:similar:swing_cpp:3600052 + +# 查看某个商品的相似推荐(Python Swing) GET item:similar:swing:12345 # 查看平台热门商品 @@ -324,10 +333,17 @@ GET interest:hot:platform:pc # 查看所有i2i相关的key KEYS item:similar:* +# 查看C++ Swing的key +KEYS item:similar:swing_cpp:* + +# 查看Python Swing的key +KEYS item:similar:swing:* + # 查看所有interest相关的key KEYS interest:* # 检查key的过期时间 +TTL item:similar:swing_cpp:3600052 TTL item:similar:swing:12345 ``` @@ -337,6 +353,7 @@ TTL item:similar:swing:12345 | 索引类型 | Key数量 | 单条Value大小 | 总内存 | |---------|--------|-------------|--------| +| i2i_swing_cpp | 50,000 | ~400B | ~20MB | | i2i_swing | 50,000 | ~500B | ~25MB | | i2i_w2v | 50,000 | ~500B | ~25MB | | i2i_deepwalk | 50,000 | ~500B | ~25MB | @@ -346,7 +363,12 @@ TTL item:similar:swing:12345 | interest_cart | 10,000 | ~1KB | ~10MB | | interest_new | 5,000 | ~1KB | ~5MB | | interest_global | 10,000 | ~1KB | ~10MB | -| **总计** | **270,000** | - | **~160MB** | +| **总计** | **320,000** | - | **~180MB** | + +**说明**: +- C++ Swing数据更紧凑(无商品名),单条大小约400B +- 建议生产环境使用C++ Swing (`swing_cpp`),性能更优 +- Python Swing可作为对照组或特殊场景使用 ### 过期策略 @@ -373,17 +395,25 @@ TTL item:similar:swing:12345 ```python # 检查加载成功率 total_keys = redis_client.dbsize() -expected_keys = 245000 +expected_keys = 320000 # 更新:包含C++ Swing success_rate = total_keys / expected_keys * 100 # 检查数据完整性 sample_keys = [ - 'item:similar:swing:12345', + 'item:similar:swing_cpp:3600052', # C++ Swing + 'item:similar:swing:12345', # Python Swing + 'item:similar:w2v:12345', 'interest:hot:platform:pc' ] for key in sample_keys: if not redis_client.exists(key): print(f"Missing key: {key}") + +# 检查C++ Swing vs Python Swing覆盖率 +cpp_swing_count = len(redis_client.keys('item:similar:swing_cpp:*')) +py_swing_count = len(redis_client.keys('item:similar:swing:*')) +print(f"C++ Swing keys: {cpp_swing_count}") +print(f"Python Swing keys: {py_swing_count}") ``` ### 性能指标 -- libgit2 0.21.2