Blame view

hot/main.py 11.8 KB
5ab1c29c   tangwang   first commit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
  import os

  import json

  import glob

  import logging

  from collections import defaultdict, Counter

  from datetime import datetime, timedelta

  import shutil

  

  # 设置日志配置

  logging.basicConfig(

      filename='logs/index_generation.log', 

      level=logging.INFO,

      format='%(asctime)s - %(levelname)s - %(message)s'

  )

  

  # 配置超参

  CONFIG = {

      'base_dir': '../fetch_data/data/',

      'books_path': '../fetch_data/meta_data/all_books.json',

      'tenants_path': '../fetch_data/meta_data/tenants.json',

      'output_dir': './output',

      'days': 30,  # 天数,用于获取最近的文件

      'top_n': 1000,  # 生成的前 N 个书单

      'tenant_type_ratio': 0.01,  # 机构和所属行业融合的比例。可以解决机构的冷启动问题。机构内的行为数据越少,受到行业的影响越大。

      'use_simple_uv_processing': True  # 是否使用简单UV处理逻辑

                                      # 配置为True:则book的read UV统计规则为 每一天的UV的累加,

                                      # 配置为False:则book的read UV统计规则为统计范围内所有天的UV,该方法更多的收到运营配置的曝光的影响,

                                      # 默认为True

  }

  

  def load_json_files(path_pattern):

      """根据通配符加载 JSON 文件"""

      files = glob.glob(path_pattern)

      data = []

      for file in files:

          with open(file, 'r', encoding='utf-8') as f:

              for line in f:

                  line = line.strip()

                  if not line:

                      continue

                  try:

                      data.append(json.loads(line))

                  except json.JSONDecodeError:

                      logging.error(f"Failed to parse JSON line in {file}: {line}")

      return data

  

  def load_books_data(books_path):

      """加载书籍属性词典,并将所有ID转换为字符串"""

      books_data = {}

      with open(books_path, 'r', encoding='utf-8') as f:

          for line in f:

              line = line.strip()

              if not line:

                  continue

              book = json.loads(line)

  

              tags = book.get('merged_tags', '')

              category1 = book.get('category1', '')

              category2 = book.get('category2', '')

              combined_tags = ','.join(filter(lambda x: x not in [None, ''], [tags, category1, category2]))

              books_data[str(book['id'])] = combined_tags  # 将book['id']转换为字符串

  

      logging.info(f"Loaded {len(books_data)} books from {books_path}")

      return books_data

  

  def load_tenants_data(tenants_path):

      """加载机构所属行业词典,并将所有ID转换为字符串"""

      tenants_data = {}

      with open(tenants_path, 'r', encoding='utf-8') as f:

          for line in f:

              line = line.strip()

              if not line:

                  continue

              tenant = json.loads(line)

              tenant_type = tenant.get('tenant_type', '')

              if not tenant_type:

                  tenant_type = ''

              tenants_data[str(tenant['id'])] = tenant_type  # 将tenant['id']转换为字符串

      logging.info(f"Loaded {len(tenants_data)} tenants from {tenants_path}")

      return tenants_data

  

  def get_recent_files(base_dir, days=30):

      """获取最近 days 天的文件"""

      today = datetime.today()

      recent_files = []

      for i in range(days):

          date_str = (today - timedelta(days=i)).strftime('%Y%m%d')

          path_pattern = os.path.join(base_dir, f'reading_time.json.{date_str}')

          recent_files.extend(glob.glob(path_pattern))

      logging.info(f"Found {len(recent_files)} files for the last {days} days")

      return recent_files

  

  def process_reading_data_by_uv(reading_files, books_data, tenants_data):

      """使用用户UV数据处理阅读数据"""

      tenant_uv = defaultdict(lambda: defaultdict(set))  # 使用集合来去重

      tenant_type_uv = defaultdict(lambda: defaultdict(set))  # 使用集合来去重

      tag_uv = defaultdict(lambda: defaultdict(set))  # 使用集合来去重

  

      for file in reading_files:

          with open(file, 'r', encoding='utf-8') as f:

              for line in f:

                  try:

                      record = json.loads(line.strip())

                      user_id = str(record.get('user_id', ''))  # 将user_id转换为字符串

                      book_id = str(record.get('book_id', ''))  # 将book_id转换为字符串

                      tenant_id = str(record.get('tenant_id', ''))  # 将tenant_id转换为字符串

  

                      if not book_id or not tenant_id or not user_id:

                          continue

  

                      tenant_uv[tenant_id][book_id].add(user_id)

                      tenant_type = tenants_data.get(tenant_id, '')  # tenant_id已经是字符串

                      tenant_type_uv[tenant_type][book_id].add(user_id)

  

                      tags = books_data.get(book_id, '').split(',')

                      for tag in tags:

                          if tag:

                              tag_uv[tag][book_id].add(user_id)

  

                  except json.JSONDecodeError:

                      logging.error(f"Failed to parse JSON line in {file}: {line}")

  

      # 转换为UV数量,即集合中user_id的数量

      tenant_uv_count = {tenant: Counter({book: len(users) for book, users in books.items()})

                         for tenant, books in tenant_uv.items()}

      tenant_type_uv_count = {tenant_type: Counter({book: len(users) for book, users in books.items()})

                              for tenant_type, books in tenant_type_uv.items()}

      tag_uv_count = {tag: Counter({book: len(users) for book, users in books.items()})

                      for tag, books in tag_uv.items()}

  

      logging.info(f"Processed reading data, total tenants: {len(tenant_uv_count)}, tenant types: {len(tenant_type_uv_count)}, tags: {len(tag_uv_count)}")

  

      return tenant_uv_count, tenant_type_uv_count, tag_uv_count

  

  def process_reading_data(reading_files, books_data, tenants_data):

      """使用简单的UV累加逻辑处理阅读数据"""

      tenant_uv = defaultdict(Counter)

      tenant_type_uv = defaultdict(Counter)

      tag_uv = defaultdict(Counter)

      

      for file in reading_files:

          with open(file, 'r', encoding='utf-8') as f:

              for line in f:

                  try:

                      record = json.loads(line.strip())

                      user_id = str(record.get('user_id', ''))  # 将user_id转换为字符串

                      book_id = str(record.get('book_id', ''))  # 将book_id转换为字符串

                      tenant_id = str(record.get('tenant_id', ''))  # 将tenant_id转换为字符串

                      

                      if not book_id or not tenant_id:

                          continue

  

                      tenant_uv[tenant_id][book_id] += 1

                      tenant_type = tenants_data.get(tenant_id, '')  # tenant_id已经是字符串

                      tenant_type_uv[tenant_type][book_id] += 1

  

                      tags = books_data.get(book_id, '').split(',')

                      for tag in tags:

                          if tag:

                              tag_uv[tag][book_id] += 1

  

                  except json.JSONDecodeError:

                      logging.error(f"Failed to parse JSON line in {file}: {line}")

      

      logging.info(f"Processed reading data, total tenants: {len(tenant_uv)}, tenant types: {len(tenant_type_uv)}, tags: {len(tag_uv)}")

      

      return tenant_uv, tenant_type_uv, tag_uv

  

  def generate_top_booklist(counter_dict, top_n=1000):

      """生成排序后的前 top_n booklist"""

      result = {}

      for key, counter in counter_dict.items():

          top_books = counter.most_common(top_n)

          if not key or len(top_books) == 0:

              continue

          result[key] = ','.join([f'{bid}:{uv}' for bid, uv in top_books])

      return result

  

  def write_output(data, output_dir, prefix, current_date):

      """写入输出文件,并生成软链接到 output 目录下"""

      try:

          output_file_path = os.path.join(output_dir, f'{prefix}_{current_date}.txt')

          output_file_link = os.path.join(output_dir, f'{prefix}.txt')

          

          if not os.path.exists(output_dir):

              os.makedirs(output_dir)

  

          with open(output_file_path, 'w', encoding='utf-8') as f:

              for key, booklist in data.items():

                  key.replace('\t', ' ')

                  if not key or not booklist:

                      continue

                  f.write(f"{key}\t{booklist}\n")

          

          logging.info(f"Output written to {output_file_path}")

  

          if os.path.islink(output_file_link) or os.path.exists(output_file_link):

              os.remove(output_file_link)

          

          os.symlink(os.path.basename(output_file_path), output_file_link)

          logging.info(f"Symlink created at {output_file_link} pointing to {output_file_path}")

  

      except Exception as e:

          logging.error(f"Error writing output or creating symlink: {str(e)}")

  

  def merge_tenant_uv_with_type_uv(tenant_uv, tenant_type_uv, tenants_data, ratio=CONFIG['tenant_type_ratio']):

      """合并 tenant 的 UV 统计和其所属 tenant_type 的 UV 统计结果

      

      融合的目的:通过融合机构所属行业的UV数据,平滑处理小机构数据不足的情况,给予它们更多的行业UV权重    ,避免因数据量小而导致的统计偏差。

      

      ratio 参数控制行业 UV 统计数据在融合过程中所占的权重比例。较高的比例表示行业数据的影响较大,较低的比例则表示单个机构的数据占主导地位。

      """

      merged_tenant_uv = defaultdict(Counter)

  

      for tenant_id, books_counter in tenant_uv.items():

          # 获取该 tenant 的 tenant_type

          tenant_type = tenants_data.get(tenant_id, '')

          

          # 获取该 tenant_type 下的 UV 统计

          tenant_type_counter = tenant_type_uv.get(tenant_type, Counter())

  

          # 合并 tenant 自身的 UV 统计和 tenant_type 的 UV 统计结果(乘以比例系数)

          for book_id, uv_count in books_counter.items():

              tenant_type_uv_adjusted = int(tenant_type_counter.get(book_id, 0) * ratio)

              merged_tenant_uv[tenant_id][book_id] = uv_count + tenant_type_uv_adjusted

  

      logging.info(f"Merged tenant UV with tenant type UV using ratio {ratio}")

      return merged_tenant_uv

  

  def main():

      # 获取当前日期

      current_date = datetime.today().strftime('%Y%m%d')

  

      # 加载书籍和机构数据

      books_data = load_books_data(CONFIG['books_path'])

      tenants_data = load_tenants_data(CONFIG['tenants_path'])

      

      # 获取最近配置的天数的阅读数据文件

      reading_files = get_recent_files(CONFIG['base_dir'], days=CONFIG['days'])

  

      # 根据配置选择UV处理逻辑

      if CONFIG['use_simple_uv_processing']:

          tenant_uv, tenant_type_uv, tag_uv = process_reading_data(reading_files, books_data, tenants_data)

      else:

          tenant_uv, tenant_type_uv, tag_uv = process_reading_data_by_uv(reading_files, books_data, tenants_data)

  

      # 合并 tenant UV 和 tenant_type UV(使用配置的比例)

      merged_tenant_uv = merge_tenant_uv_with_type_uv(tenant_uv, tenant_type_uv, tenants_data, ratio=CONFIG['tenant_type_ratio'])

  

      # 生成前N本书的书单

      tenant_booklist = generate_top_booklist(merged_tenant_uv, top_n=CONFIG['top_n'])

      tenant_type_booklist = generate_top_booklist(tenant_type_uv, top_n=CONFIG['top_n'])

      tag_booklist = generate_top_booklist(tag_uv, top_n=CONFIG['top_n'])

  

      # 写入输出文件并生成软链接

      write_output(tenant_booklist, CONFIG['output_dir'], 'tenant_booklist', current_date)

      write_output(tenant_type_booklist, CONFIG['output_dir'], 'tenant_type_booklist', current_date)

      write_output(tag_booklist, CONFIG['output_dir'], 'tag_booklist', current_date)

  

  if __name__ == '__main__':

      main()