stream_parser.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. #!/usr/bin/python
  2. # -*- coding=utf-8 -*-
  3. # @Create Time: 2024-01-15 16:43:04
  4. # @Last Modified time: 2024-01-22 11:30:31
  5. # from app.models import Record
  6. # import pandas as pd
  7. # class UserSpace(object):
  8. # def __init__(self, user: str):
  9. # self.user = user
  10. # self.df = pd.DataFrame()
  11. # def __repr__(self):
  12. # return f'<User: {self.user} DF: {self.df}>'
  13. # def push(self, item: dict):
  14. # self.df = pd.concat([self.df, pd.DataFrame([item])])
  15. # def pop(self):
  16. # pass
  17. # def predict(item):
  18. # if g.get(item['user']):
  19. # g[item['user']].push(item)
  20. # else:
  21. # g[item['user']] = UserSpace(item['user'])
  22. # g[item['user']].push(item)
  23. # if __name__ == '__main__':
  24. # g = dict()
  25. # predict({'user': '123', 'date': 123, 'domain': '人资域'})
  26. # predict({'user': '456', 'date': 456, 'domain': '营销域'})
  27. # predict({'user': '123', 'date': 789, 'domain': '营销域'})
  28. # print(g)
  29. """
  30. import time
  31. import logging
  32. import multiprocessing
  33. from datetime import datetime
  34. from multiprocessing.connection import Listener
  35. import pandas as pd
  36. logging.basicConfig(
  37. format='%(asctime)s %(levelname)s %(message)s',
  38. level=logging.INFO
  39. )
  40. logger = logging.getLogger('multilog')
  41. logger.setLevel(logging.DEBUG)
  42. class ProcessServer(object):
  43. DataFrame = pd.DataFrame(columns=['user', 'time', 'domain'])
  44. def __init__(self, host: str = "localhost", port: int = 5000):
  45. self.host = host
  46. self.port = port
  47. self.run_server()
  48. def do_socket(self, conn, addr, ):
  49. try:
  50. while True:
  51. if conn.poll(1) == False:
  52. time.sleep(0.5)
  53. continue
  54. data = conn.recv() # 等待接受数据
  55. conn.send('sucess')
  56. # ***********************
  57. # 要执行的程序写在这里
  58. # ***********************
  59. self.df = pd.concat([self.df, pd.DataFrame([data])])
  60. logger.info(data)
  61. logger.info(self.df)
  62. except Exception as e:
  63. logger.error(f'Socket Error {e}')
  64. finally:
  65. try:
  66. conn.close()
  67. logger.info(f'Connection close. {addr}')
  68. except:
  69. logger.error('close except')
  70. def run_server(self):
  71. server_sock = Listener((self.host, self.port))
  72. logger.info(f"Sever running... {self.host} {self.port}")
  73. pool = multiprocessing.Pool(10)
  74. while True:
  75. conn = server_sock.accept()
  76. addr = server_sock.last_accepted
  77. logger.info(f'Accept new connection {addr}')
  78. # 创建进程来处理TCP连接:
  79. pool.apply_async(func=self.do_socket, args=(conn, addr,))
  80. if __name__ == '__main__':
  81. server = ProcessServer(port=35010)
  82. server.run_server()
  83. """