tasks.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714
  1. #!/usr/bin/python
  2. # -*- coding=utf-8 -*-
  3. # @Create Time: 2024-01-18 09:47:28
  4. # @Last Modified time: 2024-02-02 13:37:27
  5. import re, json
  6. import datetime
  7. import dmPython
  8. import numpy as np
  9. import pandas as pd
  10. import requests
  11. from pandas import DataFrame
  12. from threading import Thread
  13. from .preprocess import parse_url
  14. from .UserActivateDegreeAnalyze import UserActivateDegreeAnalyzeSpace
  15. class ResultJson:
  16. def __init__(self, time, user_id, domain, title1, title2, title3, device_id=None, text=None, is_op=True, is_bus=False, is_bias=False, status="正常"):
  17. # 执行时间
  18. self.time = time.strftime('%Y-%m-%d %H:%M:%S')
  19. # 执行用户
  20. self.user_id = user_id
  21. # 执行域
  22. self.domain = domain
  23. self.title1 = title1
  24. self.title2 = title2
  25. # 业务项
  26. self.title3 = title3
  27. # 设备
  28. self.device_id = device_id
  29. # 操作步骤
  30. self.text = text
  31. # 是否用户操作
  32. self.is_op = is_op
  33. # 是否业务
  34. self.is_bus = is_bus
  35. # 是否偏离
  36. self.is_bias = is_bias
  37. # 执行结果
  38. self.status = status
  39. def to_json(self):
  40. return {
  41. "real": { # 实时信息
  42. "time": self.time, # 记录时间
  43. "userId": self.user_id, # 用户ID
  44. "deviceId": self.device_id, # 设备ID
  45. "domain": self.domain, # 访问域
  46. "business": self.title3, # 业务类型
  47. "option": self.text, # 执行操作
  48. "isOp": self.is_op, # 是否是一条操作
  49. "isBus": self.is_bus, # 是否是一项业务
  50. "isBias": self.is_bias, # 是否偏离
  51. "status": self.status # 执行结果
  52. },
  53. "base": { # 定时信息
  54. "用户ID": self.user_id,
  55. "常用时段": "",
  56. "所在部门": "",
  57. "使用设备": "",
  58. "操作系统": "",
  59. "常用业务": ""
  60. }
  61. }
  62. def func(item):
  63. print(item)
  64. session = DM8Session()
  65. try:
  66. result = session.execute("""INSERT INTO FRAGMENT("Time", "User_Id", "Method", "Has_Form", "Dom", "Title1", "Title2", "Title3", "Title4", "Behavior", "Stay") VALUES(?,?,?,?,?,?,?,?,?,?,?)""",
  67. (item['time'], item['user'], item['method'], item['hasform'], item['domain'], item['一级标题'], item['二级标题'], item['三级标题'], item['四级标题'], item['text'], item['stay']))
  68. except Exception as e:
  69. print(e)
  70. def postback(json_data: dict) -> None:
  71. url = 'http://192.168.1.166:8080/statistics/algorithm'
  72. data = ResultJson(time=datetime.datetime.now(), user_id="fuli@mz.gd.csg.cn", domain="财务域",
  73. title1="计划预算管理", title2="预算查询分析", title3="编制过程查询", text="69", is_op=True, is_bus=True)
  74. print(data.to_json())
  75. response = requests.post(url, json={'parameter': data.to_json()})
  76. if response.status_code == 200:
  77. print('success')
  78. else:
  79. print(response.text())
  80. class DM8Session(object):
  81. def __init__(self):
  82. self.conn = dmPython.connect(user='SYSDBA', password='SYSDBA001', server='192.168.1.202', port=30236)
  83. def query(self, sql_cmd: str, args=None):
  84. '''以数据框形式返回查询据结果'''
  85. try:
  86. self.cursor = self.conn.cursor()
  87. if args:
  88. self.cursor.execute(sql_cmd, args)
  89. else:
  90. self.cursor.execute(sql_cmd)
  91. data = self.cursor.fetchall() # 以元组形式返回查询数据
  92. header = [t[0] for t in self.cursor.description]
  93. df = pd.DataFrame(list(data), columns=header) # pd.DataFrame 对列表具有更好的兼容性
  94. except Exception as e:
  95. print(e)
  96. finally:
  97. self.cursor.close()
  98. return df
  99. def execute(self, sql_cmd: str, args):
  100. try:
  101. self.cursor = self.conn.cursor()
  102. # 执行sql语句
  103. self.cursor.execute(sql_cmd, args)
  104. self.conn.commit()
  105. status = True
  106. except Exception as e:
  107. # 发生错误时回滚
  108. self.conn.rollback()
  109. print(e)
  110. status = False
  111. finally:
  112. self.cursor.close()
  113. return status
  114. def __del__(self):
  115. self.conn.close()
  116. class UrlParser(object):
  117. def __init__(self):
  118. self.rzdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/人资域.json', orient='records', lines=True, encoding='utf-8')
  119. self.zcdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/资产域.json', orient='records', lines=True, encoding='utf-8')
  120. self.cwdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/财务域.json', orient='records', lines=True, encoding='utf-8')
  121. self.yxdf = pd.read_json(path_or_buf='/mnt/d/desktop/梅州电网/营销域.json', orient='records', lines=True, encoding='utf-8')
  122. def process(self, row: dict):
  123. # row['Current_Url'] = row['Current_Url'].apply(lambda x: (x))
  124. row['Current_Url'] = parse_url(row['Current_Url'])
  125. row['Target_Url'] = parse_url(row['Target_Url'])
  126. # return self.process_record(row.iloc[0])
  127. return self.process_record(row)
  128. def process_record(self, item: dict):
  129. i = item['Current_Url']
  130. result = None
  131. if i[1] == '10.10.21.23':
  132. result = {'domain': '人资域'}
  133. if i[2] == '/gmp/login.html':
  134. result = {
  135. 'time': item['Record_Time'],
  136. 'user': item['User_Id'],
  137. 'method': item['Request_Method'],
  138. 'hasform': True if item['Form_Data'] else False,
  139. 'domain': '人资域',
  140. '一级标题': '人资域登录',
  141. '二级标题': '人资域登录',
  142. '三级标题': '人资域登录',
  143. 'text': item['Button_Text'] if item['Button_Text'] else None
  144. }
  145. elif i[2] == '/gmp/static/gmpweb/hrIndex.html':
  146. result = {
  147. 'time': item['Record_Time'],
  148. 'user': item['User_Id'],
  149. 'method': item['Request_Method'],
  150. 'hasform': True if item['Form_Data'] else False,
  151. 'domain': '人资域',
  152. '一级标题': '人资域首页',
  153. '二级标题': '人资域首页',
  154. '三级标题': '人资域首页',
  155. 'text': item['Button_Text'] if item['Button_Text'] else None
  156. }
  157. elif i[2] == '/gmp/static/gmpweb/workbench/todo/TodoCenter.html':
  158. result = {
  159. 'time': item['Record_Time'],
  160. 'user': item['User_Id'],
  161. 'method': item['Request_Method'],
  162. 'hasform': True if item['Form_Data'] else False,
  163. 'domain': '人资域',
  164. '一级标题': '人资域',
  165. '二级标题': '公共应用',
  166. '三级标题': '代办中心',
  167. 'text': item['Button_Text'] if item['Button_Text'] else None
  168. }
  169. elif i[2] == 'hrcore_web/labormanagement/contractagreementview/myAgreement.html':
  170. result = {
  171. 'time': item['Record_Time'],
  172. 'user': item['User_Id'],
  173. 'method': item['Request_Method'],
  174. 'hasform': True if item['Form_Data'] else False,
  175. 'domain': '人资域',
  176. '一级标题': '人资域',
  177. '二级标题': '个人应用',
  178. '三级标题': '我的协议',
  179. 'text': item['Button_Text'] if item['Button_Text'] else None
  180. }
  181. elif i[5].get("appCode"):
  182. task = self.rzdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''')
  183. result = {
  184. 'time': item['Record_Time'],
  185. 'user': item['User_Id'],
  186. 'method': item['Request_Method'],
  187. 'hasform': True if item['Form_Data'] else False,
  188. 'domain': '人资域',
  189. '一级标题': task['一级标题'].values[0],
  190. '二级标题': task['二级标题'].values[0],
  191. '三级标题': task['三级标题'].values[0],
  192. 'text': item['Button_Text'] if item['Button_Text'] else None
  193. }
  194. else: # 后续使用 iframe 处理
  195. pass
  196. elif i[1] == '10.10.21.28':
  197. result = {'domain': '资产域'}
  198. if i[2] == '/gmp/static/gmpweb/index.html':
  199. result = {
  200. 'time': item['Record_Time'],
  201. 'user': item['User_Id'],
  202. 'method': item['Request_Method'],
  203. 'hasform': True if item['Form_Data'] else False,
  204. 'domain': '资产域',
  205. '一级标题': '资产域首页',
  206. '二级标题': '资产域首页',
  207. '三级标题': '资产域首页',
  208. 'text': item['Button_Text'] if item['Button_Text'] else None
  209. }
  210. elif i[5].get('appCode'):
  211. task = self.zcdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''')
  212. result = {
  213. 'time': item['Record_Time'],
  214. 'user': item['User_Id'],
  215. 'method': item['Request_Method'],
  216. 'hasform': True if item['Form_Data'] else False,
  217. 'domain': '资产域',
  218. '一级标题': task['一级标题'].values[0],
  219. '二级标题': task['二级标题'].values[0],
  220. '三级标题': task['三级标题'].values[0],
  221. 'text': item['Button_Text'] if item['Button_Text'] else None
  222. }
  223. elif i[1] == 'fms.gmp.cloud.hq.iv.csg':
  224. result = {'domain': '财务域'}
  225. if i[2] == '/gmp/login.html':
  226. result = {
  227. 'time': item['Record_Time'],
  228. 'user': item['User_Id'],
  229. 'method': item['Request_Method'],
  230. 'hasform': True if item['Form_Data'] else False,
  231. 'domain': '财务域',
  232. '一级标题': '登录财务域',
  233. '二级标题': '登录财务域',
  234. '三级标题': '登录财务域',
  235. 'text': item['Button_Text']
  236. }
  237. elif i[2] == '/gmp/index.html':
  238. result = {
  239. 'time': item['Record_Time'],
  240. 'user': item['User_Id'],
  241. 'method': item['Request_Method'],
  242. 'hasform': True if item['Form_Data'] else False,
  243. 'domain': '财务域',
  244. '一级标题': '财务域首页',
  245. '二级标题': '财务域首页',
  246. '三级标题': '财务域首页',
  247. 'text': item['Button_Text']
  248. }
  249. elif i[2] == '/gmp/': # 待确认
  250. result = {
  251. 'time': item['Record_Time'],
  252. 'user': item['User_Id'],
  253. 'method': item['Request_Method'],
  254. 'hasform': True if item['Form_Data'] else False,
  255. 'domain': '财务域',
  256. '一级标题': '财务域认证',
  257. '二级标题': '财务域认证',
  258. '三级标题': '财务域认证',
  259. 'text': item['Button_Text']
  260. }
  261. elif i[5].get('appCode'):
  262. task = self.cwdf.query(f'''path == "{i[2]}" & appCode == "{i[5]['appCode'][0]}"''')
  263. result = {
  264. 'time': item['Record_Time'],
  265. 'user': item['User_Id'],
  266. 'method': item['Request_Method'],
  267. 'hasform': True if item['Form_Data'] else False,
  268. 'domain': '财务域',
  269. '一级标题': task['一级标题'].values[0],
  270. '二级标题': task['二级标题'].values[0],
  271. '三级标题': task['三级标题'].values[0],
  272. 'text': item['Button_Text']
  273. }
  274. elif i[1] == '10.150.23.1:8010':
  275. result = {'domain': '营销域'}
  276. if i[2] == '/PMS/login.do':
  277. result = {
  278. 'time': item['Record_Time'],
  279. 'user': item['User_Id'],
  280. 'method': item['Request_Method'],
  281. 'hasform': True if item['Form_Data'] else False,
  282. 'domain': '营销域',
  283. '一级标题': '营销域登录',
  284. '二级标题': '营销域登录',
  285. '三级标题': '营销域登录',
  286. 'text': item['Button_Text'] if item['Button_Text'] else None
  287. }
  288. else:
  289. fd = item['Form_Data']
  290. if item['Form_Data'] and item['Form_Data'] != 'None':
  291. try:
  292. if item['Form_Data'][0] != "{":
  293. item["Params_Data"] = item['Form_Data']
  294. form_data = None
  295. else:
  296. item['Form_Data'] = item['Form_Data'].replace("\"remark\":\"[", "\"remark\":\"\"[")
  297. item['Form_Data'] = item['Form_Data'].replace("\"[", "[").replace("]\"", "]").replace("\"{", "{").replace("}\"", "}")
  298. item['Form_Data'] = item['Form_Data'].replace("object HTMLInputElement", "").replace("[null]", "[]").replace("\"null\"", "\"\"")
  299. item['Form_Data'] = item['Form_Data'].replace("\n", "")
  300. form_data = json.loads(item['Form_Data'])
  301. except Exception as e:
  302. logging.error(item['Form_Data'])
  303. logging.error(fd)
  304. form_data = None
  305. raise e
  306. else:
  307. form_data = None
  308. if form_data and '_INVOKE_FUNC_TITLE_' in form_data:
  309. title = form_data['_INVOKE_FUNC_TITLE_'][0]
  310. appcontext = form_data['_INVOKE_FUNC_URL_'][0].split('/')[1]
  311. logging.debug(appcontext)
  312. task = self.yxdf.query(f'''四级标题 == "{title}" & appcontext == "{appcontext}"''')
  313. if task.empty:
  314. task = self.yxdf.query(f'''三级标题 == "{title}" & appcontext == "{appcontext}"''')
  315. if task.empty:
  316. task = self.yxdf.query(f'''四级标题 == "{title}"''')
  317. if task.empty:
  318. task = self.yxdf.query(f'''三级标题 == "{title}"''')
  319. try:
  320. result = {
  321. 'time': item['Record_Time'],
  322. 'user': item['User_Id'],
  323. 'method': item['Request_Method'],
  324. 'hasform': True,
  325. 'domain': '营销域',
  326. '一级标题': task['一级标题'].values[0],
  327. '二级标题': task['二级标题'].values[0],
  328. '三级标题': task['三级标题'].values[0],
  329. '四级标题': task['四级标题'].values[0],
  330. 'text': item['Button_Text'] if item['Button_Text'] else None
  331. }
  332. except Exception as e:
  333. logging.error(task)
  334. logging.error(item['Form_Data'])
  335. logging.error(title)
  336. logging.error(e)
  337. logging.error(form_data['_INVOKE_FUNC_URL_'][0])
  338. else:
  339. result = {
  340. 'time': item['Record_Time'],
  341. 'user': item['User_Id'],
  342. 'method': item['Request_Method'],
  343. 'hasform': False,
  344. 'domain': '营销域',
  345. '一级标题': '登录门户',
  346. '二级标题': '登录门户',
  347. '三级标题': '登录门户',
  348. 'text': item['Button_Text'] if item['Button_Text'] else None
  349. }
  350. elif i[1] == '4a.gd.csg.local':
  351. result = {
  352. 'time': item['Record_Time'],
  353. 'user': item['User_Id'],
  354. 'method': item['Request_Method'],
  355. 'hasform': True if item['Form_Data'] else False,
  356. 'domain': '登录门户',
  357. '一级标题': '登录门户',
  358. '二级标题': '登录门户',
  359. '三级标题': '登录门户',
  360. 'text': item['Button_Text'] if item['Button_Text'] else None
  361. }
  362. return result
  363. class UserVisitAnalyzeSpace(object):
  364. """用户访问分析
  365. 接收用户创建的分析任务
  366. 时间范围:起始时间~结束时间
  367. 在接收用户创建的任务请求之后,会将任务信息插入MD的task表中,任务参数以JSON格式封装
  368. 执行submit,将taskid作为参数传递给submit
  369. """
  370. def __init__(self, user: str):
  371. self.user = user
  372. self.pool_size = 10
  373. self.start_point = 0
  374. self.end_point = 0
  375. # 一级缓存
  376. self.cache_item = dict()
  377. # 二级缓存数据
  378. self.df_queue = pd.DataFrame(columns=['time','user','method','hasform','domain','一级标题','二级标题','三级标题','四级标题','text', 'stay'])
  379. def __repr__(self):
  380. return f'<User: {self.user} DF: {self.df_queue}>'
  381. def push(self, item: dict):
  382. # 消息进队列
  383. self.df_queue.loc[self.end_point] = item
  384. self.end_point = (self.end_point + 1) % self.pool_size
  385. def pop(self):
  386. # 消息出队列
  387. item = self.df_queue.loc[self.start_point]
  388. self.start_point = (self.start_point + 1) % self.pool_size
  389. return item
  390. def send(self, item: dict):
  391. # 当前缓存无数据
  392. if not self.cache_item:
  393. self.cache_item = item
  394. else:
  395. # 判断缓存内数据和新数据哪个更新
  396. if item['time'] > self.cache_item['time']:
  397. # 当前数据较新,计算页面持续时间
  398. delay = (item['time'] - self.cache_item['time']).seconds
  399. if (delay < self.cache_item['stay'] + 2) and (item['三级标题'] == self.cache_item['三级标题']):
  400. self.cache_item['stay'] += delay
  401. else:
  402. self.push(self.cache_item)
  403. self.cache_item = item
  404. t = Thread(target=func, args=(self.pop(),))
  405. t.start()
  406. else:
  407. # 老数据较新,数据发送产生异常,数据产生积压
  408. pass
  409. # self.df = pd.concat([self.df, pd.DataFrame([item])])
  410. # if self.df.shape[0] >= self.pool_size:
  411. # # 新建临时表
  412. # dt = self.df[0:1]
  413. # index = 0
  414. # for i in range(1, self.pool_size):
  415. # d1, d2 = self.pooling(dt.iloc[index], self.df.iloc[i])
  416. # if isinstance(d2, pd.DataFrame):
  417. # dt.iloc[index] = d1
  418. # index += 1
  419. # dt.iloc[index] = d2
  420. # session = DM8Session()
  421. # result = session.execute("""INSERT INTO FRAGMENT("Time", "User_Id", "Dom", "Title1", "Title2", "Title3", "Title4", "Behavior", "Stay") VALUES(?,?,?,?,?,?,?,?)""",
  422. # args=(d1['time'], d1['user'], d1['domain'], d1['一级标题'], d1['二级标题'], d1['三级标题'], d1.get('四级标题'), d1['text'], d1['stay']))
  423. # else:
  424. # dt.iloc[index] = d1
  425. # self.df = pd.concat([dt, self.df[self.pool_size:]])
  426. def getActionByDateRange(self):
  427. """获取指定日期范围内的用户访问行为数据
  428. """
  429. pass
  430. def getSession2Action(self) -> DataFrame:
  431. """获取sessionid到访问行为数据的映射的
  432. """
  433. pass
  434. def aggregateBySession(self):
  435. """对行为数据按session粒度进行聚合,并将用户信息join后聚合
  436. """
  437. pass
  438. def filterSsessionAndAggrStat(self):
  439. """过滤session数据,并进行计数器值累加
  440. """
  441. pass
  442. def getSession2detail(self):
  443. """获取通过筛选条件的session的访问明细数据
  444. """
  445. pass
  446. def getTopKCategory(self):
  447. """获取topK热门分类
  448. 第一步:获取符合条件的访问过的所有品类
  449. 第二步:计算各品类的点击的次数
  450. 第三步:自定义二次排序key
  451. 第四步:将数据映射成<CategorySortKey,info>格式的RDD,然后进行二次排序(降序)
  452. 第五步:取出top10热门品类,并写入DM
  453. """
  454. pass
  455. def getClickCategory2Count(self) -> DataFrame:
  456. """获取各分类点击次数
  457. 1、过滤出点击行为数据,点击数据只占总数据的一小部分,所以过滤后数据可能不均匀
  458. """
  459. pass
  460. def getTopKSession(self):
  461. """获取每个分类top10活跃用户
  462. 1、将topK热门分类的id,生成DF
  463. 2、计算topK分类被各用户访问的次数
  464. 3、分组取TopK算法实现,获取每个分类的topK活跃用户
  465. 4、获取topK活跃用户的明细数据,并写入DM
  466. """
  467. pass
  468. # --- 池化 --- #
  469. def pooling(self, line1: dict, line2: dict):
  470. if line1['time'] > line2['time']:
  471. deltat = (line1['time'] - line2['time']).seconds
  472. if (deltat < 2) and (line1['三级标题'] == line2['三级标题']):
  473. line2['stay'] += deltat
  474. return line2, None
  475. else:
  476. return line2, line1
  477. else:
  478. deltat = (line2['time'] - line1['time']).seconds
  479. if (deltat < 2) and (line1['三级标题'] == line2['三级标题']):
  480. line1['stay'] += deltat
  481. return line1, None
  482. else:
  483. return line1, line2
  484. class BusinessClickRealTimeSpace(object):
  485. def __init__(self):
  486. self.aggregatedDStream = pd.DataFrame(columns=['word', 'count'])
  487. def calculateRealTimeStat(self, sourceDStream: dict) -> DataFrame:
  488. """计算业务访问流量实时统计
  489. 计算每天业务的访问量
  490. 设计维度:日期时间、用户、业务、访问次数
  491. 可以看到当天所有的实时数据,
  492. 通过DF,直接统计出全局的访问次数,在DF缓存一份,在DM保存一份
  493. """
  494. """
  495. 我们要对原始数据进行map,映射成<date_user_domain,1>格式
  496. 然后,对上述格式的数据,执行updateStateByKey算子
  497. """
  498. def _filter(data: dict) -> tuple:
  499. return {"word": data["time"].strftime('%Y-%m-%dT%H:%M:%S') + "_" + data["user"] + "_" + data["domain"], "count": 1}
  500. mappedDStream = _filter(sourceDStream)
  501. self.aggregatedDStream = self.aggregatedDStream.append(mappedDStream, ignore_index=True)
  502. self.aggregatedDStream = self.aggregatedDStream.groupby('word').apply(lambda x: sum(x['count'])).reset_index(name="count")
  503. # 将计算出来的结果同步一份到DM
  504. session = DM8Session()
  505. for row in self.aggregatedDStream.itertuples():
  506. try:
  507. result = session.execute("""INSERT INTO TOTALVISIT("Name", "Count") VALUES(?,?)""", (row.word, row.count))
  508. except Exception as e:
  509. print(e)
  510. return self.aggregatedDStream
  511. def calculateTopKBusiness(self):
  512. """计算每天用户的TopK访问业务
  513. 计算出每天各业务的点击量
  514. """
  515. pass
  516. def calculateBusinessCountByWindow(self):
  517. """计算最近1小时滑动窗口内的业务访问趋势
  518. """
  519. pass
  520. # 字典键首字母大写
  521. # def capital_to_higher(dict_info):
  522. # new_dict = {}
  523. # for i, j in dict_info.items():
  524. # new_dict[re.sub("([a-zA-Z])", lambda x: x.groups()[0].upper(), i, 1)] = j
  525. # return new_dict
  526. # 字典键全改小写
  527. def capital_to_lower(dict_info):
  528. new_dict = {}
  529. for i, j in dict_info.items():
  530. new_dict[i.lower()] = j
  531. return new_dict
  532. def predict(g: dict, item: dict):
  533. # 字典键全改小写
  534. item = capital_to_lower(item)
  535. # 放入全局
  536. if g.get(item['user_id']):
  537. g[item['user_id']].push(item)
  538. else:
  539. g[item['user_id']] = UserActivateDegreeAnalyzeSpace(item['user_id'])
  540. g[item['user_id']].push(item)
  541. # item = capital_to_higher(item)
  542. # if g.get(item['User_Id']):
  543. # g[item['User_Id']].push(item)
  544. # else:
  545. # g[item['User_Id']] = UserActivateDegreeAnalyzeSpace(item['User_Id'])
  546. # g[item['User_Id']].push(item)
  547. # session = DM8Session()
  548. # # 添加采集量统计
  549. # try:
  550. # df = session.query("""SELECT "ID", "Count" FROM TOTALCOUNT WHERE "Name" = ?""", args="采集量")
  551. # if df.shape[0] == 1:
  552. # result = session.execute("""UPDATE TOTALCOUNT SET "Count"=? WHERE TOTALCOUNT.ID = ?""", (int(df['Count'])+1, int(df['ID'])))
  553. # except Exception as e:
  554. # print(e)
  555. # # URL预处理存储
  556. # parser = UrlParser()
  557. # frag = parser.process(item)
  558. # frag['stay'] = 0
  559. # 添加序列
  560. # if g.get(item['User_Id']):
  561. # # g[item['User_Id']].push(item)
  562. # g[item['User_Id']].send(frag)
  563. # else:
  564. # g[item['User_Id']] = UserSpace(item['User_Id'])
  565. # # g[item['User_Id']].push(item)
  566. # g[item['User_Id']].send(frag)
  567. # if g.get('BusinessClickRealTimeSpace'):
  568. # result = g['BusinessClickRealTimeSpace'].calculateRealTimeStat(frag)
  569. # else:
  570. # g['BusinessClickRealTimeSpace'] = BusinessClickRealTimeSpace()
  571. # result = g['BusinessClickRealTimeSpace'].calculateRealTimeStat(frag)
  572. # print(result)
  573. """
  574. from sqlalchemy import Column, Integer, String, Date, Numeric, Text
  575. from sqlalchemy.ext.declarative import declarative_base
  576. # 创建对象的基类:
  577. Base = declarative_base()
  578. class Product(Base):
  579. # 表的名字:
  580. __tablename__ = 'product'
  581. # 表的结构:
  582. PRODUCTID = Column(Integer,autoincrement=True, primary_key=True)
  583. from sqlalchemy import create_engine
  584. from sqlalchemy.orm import sessionmaker
  585. from Product import Base, Product
  586. def fun_select_all(DBSession):
  587. # 创建Session
  588. session = DBSession()
  589. # 查询所有的
  590. list_product = session.query(Product).all()
  591. print('查询所有结果:')
  592. for product in list_product:
  593. print(product.NAME)# , product.AUTHOR, product.PUBLISHER )
  594. print('')
  595. session.close()
  596. def fun_insert(DBSession):
  597. # 创建Session
  598. session = DBSession()
  599. new_product = Product()
  600. new_product.NAME = '水浒传'
  601. session.add(new_product)
  602. session.commit()
  603. print('插入成功')
  604. session.close()
  605. def fun_update(DBSession):
  606. # 创建Session
  607. session = DBSession()
  608. product = session.query(Product).filter(Product.NAME == '水浒传').one()
  609. product.NAME = '水浒'
  610. session.commit()
  611. print('更新成功')
  612. session.close()
  613. def fun_delete(DBSession):
  614. # 创建Session
  615. session = DBSession()
  616. session.query(Product).filter(Product.NAME == '水浒').delete()
  617. session.commit()
  618. print('删除成功')
  619. session.close()
  620. def main():
  621. #dialect 是SQLAlchemy用来与各种类型的DBAPI和数据库通信的系统。
  622. conn_url = 'dm+dmPython://SYSDBA:SYSDBA001@192.168.1.202:30236'
  623. #创建Engine对象
  624. engine = create_engine(conn_url)
  625. #创建DBSession对象
  626. DBSession = sessionmaker(bind=engine)
  627. Base.metadata.create_all(engine) # 创建表结构
  628. fun_select_all(DBSession)
  629. # # 插入
  630. fun_insert(DBSession)
  631. fun_select_all(DBSession)
  632. # # 插入
  633. fun_insert1(DBSession)
  634. fun_select_all(DBSession)
  635. # # 更新
  636. fun_update(DBSession)
  637. fun_select_all(DBSession)
  638. # # 删除
  639. fun_delete(DBSession)
  640. fun_select_all(DBSession)
  641. """