#!/usr/bin/python # -*- coding=utf-8 -*- # @Create Time: 2024-01-15 16:43:04 # @Last Modified time: 2024-01-22 11:30:31 # from app.models import Record # import pandas as pd # class UserSpace(object): # def __init__(self, user: str): # self.user = user # self.df = pd.DataFrame() # def __repr__(self): # return f'' # def push(self, item: dict): # self.df = pd.concat([self.df, pd.DataFrame([item])]) # def pop(self): # pass # def predict(item): # if g.get(item['user']): # g[item['user']].push(item) # else: # g[item['user']] = UserSpace(item['user']) # g[item['user']].push(item) # if __name__ == '__main__': # g = dict() # predict({'user': '123', 'date': 123, 'domain': '人资域'}) # predict({'user': '456', 'date': 456, 'domain': '营销域'}) # predict({'user': '123', 'date': 789, 'domain': '营销域'}) # print(g) """ import time import logging import multiprocessing from datetime import datetime from multiprocessing.connection import Listener import pandas as pd logging.basicConfig( format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO ) logger = logging.getLogger('multilog') logger.setLevel(logging.DEBUG) class ProcessServer(object): DataFrame = pd.DataFrame(columns=['user', 'time', 'domain']) def __init__(self, host: str = "localhost", port: int = 5000): self.host = host self.port = port self.run_server() def do_socket(self, conn, addr, ): try: while True: if conn.poll(1) == False: time.sleep(0.5) continue data = conn.recv() # 等待接受数据 conn.send('sucess') # *********************** # 要执行的程序写在这里 # *********************** self.df = pd.concat([self.df, pd.DataFrame([data])]) logger.info(data) logger.info(self.df) except Exception as e: logger.error(f'Socket Error {e}') finally: try: conn.close() logger.info(f'Connection close. {addr}') except: logger.error('close except') def run_server(self): server_sock = Listener((self.host, self.port)) logger.info(f"Sever running... {self.host} {self.port}") pool = multiprocessing.Pool(10) while True: conn = server_sock.accept() addr = server_sock.last_accepted logger.info(f'Accept new connection {addr}') # 创建进程来处理TCP连接: pool.apply_async(func=self.do_socket, args=(conn, addr,)) if __name__ == '__main__': server = ProcessServer(port=35010) server.run_server() """