Skeletal Animation Multithread Face
face_mesh_server.py
Go to the documentation of this file.
1import socket
2import threading
3import cv2
4import mediapipe as mp
5import numpy as np
6import json
7
8
10
11 def __init__(self):
12
13 # Define port
14 self.port = 5052
15
16 # Define data format
17 self.format = format = 'utf-8'
18
19 #Get local ip adress
20 self.server_ip = socket.gethostbyname(socket.gethostname())
21
22 # Generate adress
23 self.address = (self.server_ip, self.port)
24
25 # Generate server
26 self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
27
28 # Bound socket to adress
29 socket.socket().setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
30
31 self.server.bind(self.address)
32
33
34 mp_face_mesh = mp.solutions.mediapipe.python.solutions.face_mesh
35
36 # Initialize face mesh
37 self.face_mesh = mp_face_mesh.FaceMesh()
38
39 # Start camera capturing
40 self.cap = cv2.VideoCapture(0)
41
42 # Start listening for connections
43 def start_server(self):
44
45 self.server.listen()
46
47 print(f"[LISTENING] Server is listening on {self.server_ip}")
48
49 while True:
50
51 # Listen for new connection
52 conn, addr = self.server.accept()
53
54 # Generate thread to handle client
55 thread = threading.Thread(target=self.handle_client, args=(conn, addr))
56 thread.start()
57
58 # Handle the client
59 def handle_client(self, conn, addr):
60
61 print("[NEW CONNECTION] {addr} connected.")
62
63 connected = True
64
65 while connected:
66
67 # Get image
68 ret, img = self.cap.read()
69
70 # Get pointcloud
71 (pc_x, pc_y, pc_z) = self.get_pointcloud(img)
72
73 # Check the length of the point cloud and start sending data
74 if (len(pc_x) > 0):
75
76 # Generate a dictionary
77 point_cloud_dict = { "x": pc_x, "y": pc_y, "z": pc_z }
78
79 # Convert dictionary to json file
80 pc_data = json.dumps(point_cloud_dict)
81
82 conn.send(bytes(pc_data, encoding=self.format))
83
84 conn.close()
85
86 # Get point cloud
87 def get_pointcloud(self, img):
88
89 results = self.face_mesh.process(img)
90
91 #Pointcloud
92 pointcloud_x = []
93 pointcloud_y = []
94 pointcloud_z = []
95
96 if results.multi_face_landmarks:
97
98 for face_landmarks in results.multi_face_landmarks:
99 for lm in face_landmarks.landmark:
100 pointcloud_x.append(lm.x)
101 pointcloud_y.append(lm.y)
102 pointcloud_z.append(lm.z)
103
104 return (pointcloud_x, pointcloud_y, pointcloud_z)
105
106
107face_mesh_server = FaceMeshServer()
108face_mesh_server.start_server()
face_mesh
Face landmarking ##########################.
def handle_client(self, conn, addr)