123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- #!/usr/bin/python
- # -*- coding=utf-8 -*-
- # @Create Time: 2024-01-15 16:43:04
- # @Last Modified time: 2024-01-22 11:30:31
- # from app.models import Record
- # import pandas as pd
- # class UserSpace(object):
- # def __init__(self, user: str):
- # self.user = user
- # self.df = pd.DataFrame()
- # def __repr__(self):
- # return f'<User: {self.user} DF: {self.df}>'
- # def push(self, item: dict):
- # self.df = pd.concat([self.df, pd.DataFrame([item])])
- # def pop(self):
- # pass
- # def predict(item):
- # if g.get(item['user']):
- # g[item['user']].push(item)
- # else:
- # g[item['user']] = UserSpace(item['user'])
- # g[item['user']].push(item)
- # if __name__ == '__main__':
- # g = dict()
- # predict({'user': '123', 'date': 123, 'domain': '人资域'})
- # predict({'user': '456', 'date': 456, 'domain': '营销域'})
- # predict({'user': '123', 'date': 789, 'domain': '营销域'})
- # print(g)
- """
- import time
- import logging
- import multiprocessing
- from datetime import datetime
- from multiprocessing.connection import Listener
- import pandas as pd
- logging.basicConfig(
- format='%(asctime)s %(levelname)s %(message)s',
- level=logging.INFO
- )
- logger = logging.getLogger('multilog')
- logger.setLevel(logging.DEBUG)
- class ProcessServer(object):
- DataFrame = pd.DataFrame(columns=['user', 'time', 'domain'])
- def __init__(self, host: str = "localhost", port: int = 5000):
- self.host = host
- self.port = port
- self.run_server()
- def do_socket(self, conn, addr, ):
- try:
- while True:
- if conn.poll(1) == False:
- time.sleep(0.5)
- continue
- data = conn.recv() # 等待接受数据
- conn.send('sucess')
- # ***********************
- # 要执行的程序写在这里
- # ***********************
- self.df = pd.concat([self.df, pd.DataFrame([data])])
- logger.info(data)
- logger.info(self.df)
-
- except Exception as e:
- logger.error(f'Socket Error {e}')
- finally:
- try:
- conn.close()
- logger.info(f'Connection close. {addr}')
- except:
- logger.error('close except')
- def run_server(self):
- server_sock = Listener((self.host, self.port))
- logger.info(f"Sever running... {self.host} {self.port}")
- pool = multiprocessing.Pool(10)
- while True:
- conn = server_sock.accept()
- addr = server_sock.last_accepted
- logger.info(f'Accept new connection {addr}')
- # 创建进程来处理TCP连接:
- pool.apply_async(func=self.do_socket, args=(conn, addr,))
- if __name__ == '__main__':
- server = ProcessServer(port=35010)
- server.run_server()
- """
|