tasks.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557
  1. #!/usr/bin/python
  2. # -*- coding=utf-8 -*-
  3. # @Create Time: 2024-01-18 09:47:28
  4. # @Last Modified time: 2024-01-23 11:53:14
  5. import re
  6. import datetime
  7. import dmPython
  8. import pandas as pd
  9. from pandas import DataFrame
  10. from threading import Thread
  11. from .preprocess import parse_url
  12. from .UserActivateDegreeAnalyze import UserActivateDegreeAnalyzeSpace
  13. def func(item):
  14. print(item)
  15. session = DM8Session()
  16. try:
  17. result = session.insert("""INSERT INTO FRAGMENT("Time", "User_Id", "Method", "Has_Form", "Dom", "Title1", "Title2", "Title3", "Title4", "Behavior", "Stay") VALUES(?,?,?,?,?,?,?,?,?,?,?)""",
  18. (item['time'],item['user'],item['method'],item['hasform'],item['domain'],item['一级标题'],item['二级标题'],item['三级标题'],item['四级标题'],item['text'], item['stay']))
  19. except Exception as e:
  20. print(e)
  21. # class filteredRealTimeLogDStream(object):
  22. # time: str
  23. # user_id: str
  24. # method: str
  25. # hasform: bool
  26. # domain: str
  27. # title1: str
  28. # title2: str
  29. # title3: str
  30. # title4: str
  31. # behavior: str
  32. # stay: int = 0
  33. class DM8Session(object):
  34. def __init__(self):
  35. self.conn = dmPython.connect(user='SYSDBA', password='SYSDBA001', server='192.168.1.202', port=30236)
  36. def query(self, sql_cmd: str, args=None):
  37. '''以数据框形式返回查询据结果'''
  38. try:
  39. self.cursor = self.conn.cursor()
  40. if args:
  41. self.cursor.execute(sql_cmd, args)
  42. else:
  43. self.cursor.execute(sql_cmd)
  44. data = self.cursor.fetchall() # 以元组形式返回查询数据
  45. header = [t[0] for t in self.cursor.description]
  46. df = pd.DataFrame(list(data), columns=header) # pd.DataFrame 对列表具有更好的兼容性
  47. except Exception as e:
  48. print(e)
  49. finally:
  50. self.cursor.close()
  51. return df
  52. def insert(self, sql_cmd: str, args):
  53. try:
  54. self.cursor = self.conn.cursor()
  55. # 执行sql语句
  56. self.cursor.execute(sql_cmd, args)
  57. self.conn.commit()
  58. status = True
  59. except Exception as e:
  60. # 发生错误时回滚
  61. self.conn.rollback()
  62. print(e)
  63. status = False
  64. finally:
  65. self.cursor.close()
  66. return status
  67. def update(self, sql_cmd: str, args):
  68. try:
  69. self.cursor = self.conn.cursor()
  70. self.cursor.execute(sql_cmd, args)
  71. self.conn.commit()
  72. status = True
  73. except Exception as e:
  74. self.conn.rollback()
  75. print(e)
  76. status = False
  77. finally:
  78. self.cursor.close()
  79. return status
  80. def __del__(self):
  81. self.conn.close()
  82. class UrlParser(object):
  83. def __init__(self):
  84. self.rzdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/人资域.json', orient='records', lines=True, encoding='utf-8')
  85. self.zcdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/资产域.json', orient='records', lines=True, encoding='utf-8')
  86. self.cwdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/财务域.json', orient='records', lines=True, encoding='utf-8')
  87. self.yxdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/营销域.json', orient='records', lines=True, encoding='utf-8')
  88. def process(self, row: dict):
  89. # row['Current_Url'] = row['Current_Url'].apply(lambda x: (x))
  90. row['Current_Url'] = parse_url(row['Current_Url'])
  91. row['Target_Url'] = parse_url(row['Target_Url'])
  92. # return self.process_record(row.iloc[0])
  93. return self.process_record(row)
  94. def process_record(self, item: dict):
  95. i = item['Current_Url']
  96. j = item['Target_Url']
  97. # result = item
  98. result = None
  99. if i[1] == '10.10.21.23':
  100. result = {'domain': '人资域'}
  101. if i[5].get("appCode"):
  102. task = self.rzdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''')
  103. result = {
  104. 'time': item['Record_Time'],
  105. 'user': item['User_Id'],
  106. 'method': item['Request_Method'],
  107. 'hasform': True if item['Form_Data'] else False,
  108. 'domain': '人资域',
  109. '一级标题': task['一级标题'].values[0],
  110. '二级标题': task['二级标题'].values[0],
  111. '三级标题': task['三级标题'].values[0],
  112. 'text': item['Button_Text'] if item['Button_Text'] else None
  113. }
  114. elif i[1] == '10.10.21.28':
  115. result = {'domain': '资产域'}
  116. if i[5].get('appCode'):
  117. task = self.zcdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''')
  118. result = {
  119. 'time': item['Record_Time'],
  120. 'user': item['User_Id'],
  121. 'method': item['Request_Method'],
  122. 'hasform': True if item['Form_Data'] else False,
  123. 'domain': '资产域',
  124. '一级标题': task['一级标题'].values[0],
  125. '二级标题': task['二级标题'].values[0],
  126. '三级标题': task['三级标题'].values[0],
  127. 'text': item['Button_Text'] if item['Button_Text'] else None
  128. }
  129. elif i[1] == 'fms.gmp.cloud.hq.iv.csg':
  130. result = {'domain': '财务域'}
  131. if i[5].get('appCode'):
  132. task = self.cwdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''')
  133. result = {
  134. 'time': item['Record_Time'],
  135. 'user': item['User_Id'],
  136. 'method': item['Request_Method'],
  137. 'hasform': True if item['Form_Data'] else False,
  138. 'domain': '财务域',
  139. '一级标题': task['一级标题'].values[0],
  140. '二级标题': task['二级标题'].values[0],
  141. '三级标题': task['三级标题'].values[0],
  142. 'text': item['Button_Text'] if item['Button_Text'] else None
  143. }
  144. elif i[1] == '10.150.23.1:8010':
  145. result = {'domain': '营销域'}
  146. fd = item['Form_Data']
  147. if item['Form_Data'] and item['Form_Data'] != 'None':
  148. try:
  149. if item['Form_Data'][0] != "{":
  150. item["Params_Data"] = item['Form_Data']
  151. form_data = None
  152. else:
  153. item['Form_Data'] = item['Form_Data'].replace("\"remark\":\"[", "\"remark\":\"\"[")
  154. item['Form_Data'] = item['Form_Data'].replace("\"[", "[").replace("]\"", "]").replace("\"{", "{").replace("}\"", "}")
  155. item['Form_Data'] = item['Form_Data'].replace("object HTMLInputElement", "").replace("[null]", "[]").replace("\"null\"", "\"\"")
  156. item['Form_Data'] = item['Form_Data'].replace("\n", "")
  157. form_data = json.loads(item['Form_Data'])
  158. except Exception as e:
  159. logging.error(item['Form_Data'])
  160. logging.error(fd)
  161. form_data = None
  162. raise e
  163. else:
  164. form_data = None
  165. if form_data and '_INVOKE_FUNC_TITLE_' in form_data:
  166. title = form_data['_INVOKE_FUNC_TITLE_'][0]
  167. appcontext = form_data['_INVOKE_FUNC_URL_'][0].split('/')[1]
  168. logging.debug(appcontext)
  169. task = self.yxdf.query(f'''四级标题 == "{title}" & appcontext == "{appcontext}"''')
  170. if task.empty:
  171. task = self.yxdf.query(f'''三级标题 == "{title}" & appcontext == "{appcontext}"''')
  172. if task.empty:
  173. task = self.yxdf.query(f'''四级标题 == "{title}"''')
  174. if task.empty:
  175. task = self.yxdf.query(f'''三级标题 == "{title}"''')
  176. try:
  177. result = {
  178. 'time': item['Record_Time'],
  179. 'user': item['User_Id'],
  180. 'method': item['Request_Method'],
  181. 'hasform': True,
  182. 'domain': '营销域',
  183. '一级标题': task['一级标题'].values[0],
  184. '二级标题': task['二级标题'].values[0],
  185. '三级标题': task['三级标题'].values[0],
  186. '四级标题': task['四级标题'].values[0],
  187. 'text': item['Button_Text'] if item['Button_Text'] else None
  188. }
  189. except Exception as e:
  190. logging.error(task)
  191. logging.error(item['Form_Data'])
  192. logging.error(title)
  193. logging.error(e)
  194. logging.error(form_data['_INVOKE_FUNC_URL_'][0])
  195. else:
  196. result = {
  197. 'time': item['Record_Time'],
  198. 'user': item['User_Id'],
  199. 'domain': '营销域',
  200. 'hasform': False,
  201. 'text': item['Button_Text'] if item['Button_Text'] else None
  202. }
  203. elif i[1] == '4a.gd.csg.local':
  204. result = {
  205. 'time': item['Record_Time'],
  206. 'user': item['User_Id'],
  207. '域': '登录门户'
  208. }
  209. return result
  210. class UserVisitAnalyzeSpace(object):
  211. """用户访问分析
  212. 接收用户创建的分析任务
  213. 时间范围:起始时间~结束时间
  214. 在接收用户创建的任务请求之后,会将任务信息插入MD的task表中,任务参数以JSON格式封装
  215. 执行submit,将taskid作为参数传递给submit
  216. """
  217. def __init__(self, user: str):
  218. self.user = user
  219. self.pool_size = 10
  220. self.start_point = 0
  221. self.end_point = 0
  222. # 一级缓存
  223. self.cache_item = dict()
  224. # 二级缓存数据
  225. self.df_queue = pd.DataFrame(columns=['time','user','method','hasform','domain','一级标题','二级标题','三级标题','四级标题','text', 'stay'])
  226. def __repr__(self):
  227. return f'<User: {self.user} DF: {self.df_queue}>'
  228. def push(self, item: dict):
  229. # 消息进队列
  230. self.df_queue.loc[self.end_point] = item
  231. self.end_point = (self.end_point + 1) % self.pool_size
  232. def pop(self):
  233. # 消息出队列
  234. item = self.df_queue.loc[self.start_point]
  235. self.start_point = (self.start_point + 1) % self.pool_size
  236. return item
  237. def send(self, item: dict):
  238. # 当前缓存无数据
  239. if not self.cache_item:
  240. self.cache_item = item
  241. else:
  242. # 判断缓存内数据和新数据哪个更新
  243. if item['time'] > self.cache_item['time']:
  244. # 当前数据较新,计算页面持续时间
  245. delay = (item['time'] - self.cache_item['time']).seconds
  246. if (delay < self.cache_item['stay'] + 2) and (item['三级标题'] == self.cache_item['三级标题']):
  247. self.cache_item['stay'] += delay
  248. else:
  249. self.push(self.cache_item)
  250. self.cache_item = item
  251. t = Thread(target=func, args=(self.pop(),))
  252. t.start()
  253. else:
  254. # 老数据较新,数据发送产生异常,数据产生积压
  255. pass
  256. # self.df = pd.concat([self.df, pd.DataFrame([item])])
  257. # if self.df.shape[0] >= self.pool_size:
  258. # # 新建临时表
  259. # dt = self.df[0:1]
  260. # index = 0
  261. # for i in range(1, self.pool_size):
  262. # d1, d2 = self.pooling(dt.iloc[index], self.df.iloc[i])
  263. # if isinstance(d2, pd.DataFrame):
  264. # dt.iloc[index] = d1
  265. # index += 1
  266. # dt.iloc[index] = d2
  267. # session = DM8Session()
  268. # result = session.insert("""INSERT INTO FRAGMENT("Time", "User_Id", "Dom", "Title1", "Title2", "Title3", "Title4", "Behavior", "Stay") VALUES(?,?,?,?,?,?,?,?)""",
  269. # args=(d1['time'], d1['user'], d1['domain'], d1['一级标题'], d1['二级标题'], d1['三级标题'], d1.get('四级标题'), d1['text'], d1['stay']))
  270. # else:
  271. # dt.iloc[index] = d1
  272. # self.df = pd.concat([dt, self.df[self.pool_size:]])
  273. def getActionByDateRange(self):
  274. """获取指定日期范围内的用户访问行为数据
  275. """
  276. pass
  277. def getSession2Action(self) -> DataFrame:
  278. """获取sessionid到访问行为数据的映射的
  279. """
  280. pass
  281. def aggregateBySession(self):
  282. """对行为数据按session粒度进行聚合,并将用户信息join后聚合
  283. """
  284. pass
  285. def filterSsessionAndAggrStat(self):
  286. """过滤session数据,并进行计数器值累加
  287. """
  288. pass
  289. def getSession2detail(self):
  290. """获取通过筛选条件的session的访问明细数据
  291. """
  292. pass
  293. def getTopKCategory(self):
  294. """获取topK热门分类
  295. 第一步:获取符合条件的访问过的所有品类
  296. 第二步:计算各品类的点击的次数
  297. 第三步:自定义二次排序key
  298. 第四步:将数据映射成<CategorySortKey,info>格式的RDD,然后进行二次排序(降序)
  299. 第五步:取出top10热门品类,并写入DM
  300. """
  301. pass
  302. def getClickCategory2Count(self):# -> DataFrame
  303. """获取各分类点击次数
  304. 1、过滤出点击行为数据,点击数据只占总数据的一小部分,所以过滤后数据可能不均匀
  305. """
  306. pass
  307. def getTopKSession(self):
  308. """获取每个分类top10活跃用户
  309. 1、将topK热门分类的id,生成DF
  310. 2、计算topK分类被各用户访问的次数
  311. 3、分组取TopK算法实现,获取每个分类的topK活跃用户
  312. 4、获取topK活跃用户的明细数据,并写入DM
  313. """
  314. pass
  315. # --- 池化 --- #
  316. def pooling(self, line1: dict, line2: dict):
  317. if line1['time'] > line2['time']:
  318. deltat = (line1['time'] - line2['time']).seconds
  319. if (deltat < 2) and (line1['三级标题'] == line2['三级标题']):
  320. line2['stay'] += deltat
  321. return line2, None
  322. else:
  323. return line2, line1
  324. else:
  325. deltat = (line2['time'] - line1['time']).seconds
  326. if (deltat < 2) and (line1['三级标题'] == line2['三级标题']):
  327. line1['stay'] += deltat
  328. return line1, None
  329. else:
  330. return line1, line2
  331. class BusinessClickRealTimeSpace(object):
  332. def __init__(self):
  333. self.aggregatedDStream = pd.DataFrame(columns=['word', 'count'])
  334. def calculateRealTimeStat(self, sourceDStream: dict) -> DataFrame:
  335. """计算业务访问流量实时统计
  336. 计算每天业务的访问量
  337. 设计维度:日期时间、用户、业务、访问次数
  338. 可以看到当天所有的实时数据,
  339. 通过DF,直接统计出全局的访问次数,在DF缓存一份,在DM保存一份
  340. """
  341. """
  342. 我们要对原始数据进行map,映射成<date_user_domain,1>格式
  343. 然后,对上述格式的数据,执行updateStateByKey算子
  344. """
  345. def _filter(data: dict) -> tuple:
  346. return {"word": data["time"].strftime('%Y-%m-%dT%H:%M:%S') + "_" + data["user"] + "_" + data["domain"], "count": 1}
  347. mappedDStream = _filter(sourceDStream)
  348. self.aggregatedDStream = self.aggregatedDStream.append(mappedDStream, ignore_index=True)
  349. self.aggregatedDStream = self.aggregatedDStream.groupby('word').apply(lambda x: sum(x['count'])).reset_index(name="count")
  350. # 将计算出来的结果同步一份到DM
  351. session = DM8Session()
  352. for row in self.aggregatedDStream.itertuples():
  353. try:
  354. result = session.insert("""INSERT INTO TOTALVISIT("Name", "Count") VALUES(?,?)""", (row.word, row.count))
  355. except Exception as e:
  356. print(e)
  357. return self.aggregatedDStream
  358. def calculateTopKBusiness(self):
  359. """计算每天用户的TopK访问业务
  360. 计算出每天各业务的点击量
  361. """
  362. pass
  363. def calculateBusinessCountByWindow(self):
  364. """计算最近1小时滑动窗口内的业务访问趋势
  365. """
  366. pass
  367. # class PageOneStepSpace(object):
  368. # def __init__(self):
  369. # pass
  370. # def getSession2Action(self):
  371. # """获取<session,用户访问行为>格式的数据
  372. # """
  373. # pass
  374. # def persistConvertRate(self):
  375. # """持久转化率
  376. # """
  377. # pass
  378. def capital_to_higher(dict_info):
  379. new_dict = {}
  380. for i, j in dict_info.items():
  381. new_dict[re.sub("([a-zA-Z])", lambda x: x.groups()[0].upper(), i, 1)] = j
  382. return new_dict
  383. def predict(g: dict, item: dict):
  384. item = capital_to_higher(item)
  385. if g.get(item['User_Id']):
  386. g[item['User_Id']].push(item)
  387. else:
  388. g[item['User_Id']] = UserActivateDegreeAnalyzeSpace(item['User_Id'])
  389. g[item['User_Id']].push(item)
  390. # session = DM8Session()
  391. # # 添加采集量统计
  392. # try:
  393. # df = session.query("""SELECT "ID", "Count" FROM TOTALCOUNT WHERE "Name" = ?""", args="采集量")
  394. # if df.shape[0] == 1:
  395. # result = session.update("""UPDATE TOTALCOUNT SET "Count"=? WHERE TOTALCOUNT.ID = ?""", (int(df['Count'])+1, int(df['ID'])))
  396. # except Exception as e:
  397. # print(e)
  398. # # URL预处理存储
  399. # parser = UrlParser()
  400. # frag = parser.process(item)
  401. # frag['stay'] = 0
  402. # 添加序列
  403. # if g.get(item['User_Id']):
  404. # # g[item['User_Id']].push(item)
  405. # g[item['User_Id']].send(frag)
  406. # else:
  407. # g[item['User_Id']] = UserSpace(item['User_Id'])
  408. # # g[item['User_Id']].push(item)
  409. # g[item['User_Id']].send(frag)
  410. # if g.get('BusinessClickRealTimeSpace'):
  411. # result = g['BusinessClickRealTimeSpace'].calculateRealTimeStat(frag)
  412. # else:
  413. # g['BusinessClickRealTimeSpace'] = BusinessClickRealTimeSpace()
  414. # result = g['BusinessClickRealTimeSpace'].calculateRealTimeStat(frag)
  415. # print(result)
  416. """
  417. from sqlalchemy import Column, Integer, String, Date, Numeric, Text
  418. from sqlalchemy.ext.declarative import declarative_base
  419. # 创建对象的基类:
  420. Base = declarative_base()
  421. class Product(Base):
  422. # 表的名字:
  423. __tablename__ = 'product'
  424. # 表的结构:
  425. PRODUCTID = Column(Integer,autoincrement=True, primary_key=True)
  426. from sqlalchemy import create_engine
  427. from sqlalchemy.orm import sessionmaker
  428. from Product import Base, Product
  429. def fun_select_all(DBSession):
  430. # 创建Session
  431. session = DBSession()
  432. # 查询所有的
  433. list_product = session.query(Product).all()
  434. print('查询所有结果:')
  435. for product in list_product:
  436. print(product.NAME)# , product.AUTHOR, product.PUBLISHER )
  437. print('')
  438. session.close()
  439. def fun_insert(DBSession):
  440. # 创建Session
  441. session = DBSession()
  442. new_product = Product()
  443. new_product.NAME = '水浒传'
  444. session.add(new_product)
  445. session.commit()
  446. print('插入成功')
  447. session.close()
  448. def fun_update(DBSession):
  449. # 创建Session
  450. session = DBSession()
  451. product = session.query(Product).filter(Product.NAME == '水浒传').one()
  452. product.NAME = '水浒'
  453. session.commit()
  454. print('更新成功')
  455. session.close()
  456. def fun_delete(DBSession):
  457. # 创建Session
  458. session = DBSession()
  459. session.query(Product).filter(Product.NAME == '水浒').delete()
  460. session.commit()
  461. print('删除成功')
  462. session.close()
  463. def main():
  464. #dialect 是SQLAlchemy用来与各种类型的DBAPI和数据库通信的系统。
  465. conn_url = 'dm+dmPython://SYSDBA:SYSDBA001@192.168.1.202:30236'
  466. #创建Engine对象
  467. engine = create_engine(conn_url)
  468. #创建DBSession对象
  469. DBSession = sessionmaker(bind=engine)
  470. Base.metadata.create_all(engine) # 创建表结构
  471. fun_select_all(DBSession)
  472. # # 插入
  473. fun_insert(DBSession)
  474. fun_select_all(DBSession)
  475. # # 插入
  476. fun_insert1(DBSession)
  477. fun_select_all(DBSession)
  478. # # 更新
  479. fun_update(DBSession)
  480. fun_select_all(DBSession)
  481. # # 删除
  482. fun_delete(DBSession)
  483. fun_select_all(DBSession)
  484. """