五、话题(topic)
📌话题可理解为节点之间的通信
1.动手实现
(1)话题发布者
topic_publisher.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class TopicPublisherNode(Node):
def __init__(self, node_name):
super().__init__(node_name)
self.pub = self.create_publisher(String, "hello", 10) # 参数:类型,名称,队列长度
self.timer = self.create_timer(0.5, self.timer_callback) # 定时执行函数
def timer_callback(self):
msg = String()
msg.data = "Hello Subscribers"
self.pub.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg.data)
def main(args=None):
rclpy.init(args=args)
node = TopicPublisherNode("topic_hello_publisher")
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
(2)话题订阅者
topic_subscriber.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class TopicSubscriberNode(Node):
def __init__(self, node_name):
super().__init__(node_name)
self.sub = self.create_subscription(String, "hello", self.sub_callback, 10)
def sub_callback(self, msg):
self.get_logger().info('I heard: "%s"' % msg.data)
def main(args=None):
rclpy.init(args=args)
node = TopicSubscriberNode("topic_hello_subscriber")
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
(3)配置节点并运行
同时运行发布者和订阅者
2.进阶--三个节点配合实现人脸打卡
(1)环境配置
sudo pip3 install face_recognition
(2)先前准备--录入人脸
录入人脸只需要把每个人的正脸照片放进一个文件夹即可
我这里放在了\~/faces/
(3)摄像头画面采集节点
camera.py
import cv2
import rclpy
from rclpy.node import Node
from cv_bridge import CvBridge
from sensor_msgs.msg import Image
class Camera(Node):
def __init__(self, node_name):
super().__init__(node_name)
self.pub = self.create_publisher(Image, "camera", 10)
self.timer = self.create_timer(0.1, self.timer_callback)
self.cap = cv2.VideoCapture(0)
self.cv_bridge = CvBridge()
def timer_callback(self):
ret, frame = self.cap.read()
if ret:
self.pub.publish(self.cv_bridge.cv2_to_imgmsg(frame, 'bgr8'))
self.get_logger().info('Publishing video frame')
def main(args=None):
rclpy.init(args=args)
node = Camera("topic_camera_node")
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
(4)人脸识别节点
face_recg.py
import os
import cv2
import rclpy
import face_recognition
from rclpy.node import Node
from cv_bridge import CvBridge
from sensor_msgs.msg import Image
from std_msgs.msg import String
class FaceRecg(Node):
def __init__(self, node_name):
super().__init__(node_name)
self.sub = self.create_subscription(
Image, 'camera', self.listener_callback, 10
)
self.cv_bridge = CvBridge()
self.pub = self.create_publisher(String, "checkin", 10)
self.msg = String()
self.faces_path = '/home/sp1der/faces/'
self.known_names=[]
self.known_encodings=[]
for image_name in os.listdir(self.faces_path):
load_image = face_recognition.load_image_file(self.faces_path+image_name)
image_face_encoding = face_recognition.face_encodings(load_image)[0]
self.known_names.append(image_name.split(".")[0])
self.known_encodings.append(image_face_encoding)
def listener_callback(self, data):
self.get_logger().info('Receiving video')
frame = self.cv_bridge.imgmsg_to_cv2(data, 'bgr8')
self.face_recg(frame)
def face_recg(self, frame):
rgb_frame = cv2.resize(frame, (0, 0), fx=1, fy=1)
face_locations = face_recognition.face_locations(rgb_frame)
face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
face_names = []
for face_encoding in face_encodings:
matches = face_recognition.compare_faces(self.known_encodings, face_encoding,tolerance=0.5)
if True in matches:
first_match_index = matches.index(True)
name = self.known_names[first_match_index]
else:
name="unknown"
face_names.append(name)
for (top, right, bottom, left), name in zip(face_locations, face_names):
cv2.rectangle(frame, (left, top), (right, bottom), (0, 0, 255), 2)
self.msg.data = name
self.pub.publish(self.msg)
self.get_logger().info('%s checking in' % name)
cv2.imshow('frame', frame)
cv2.waitKey(1)
def main(args=None):
rclpy.init(args=args)
node = FaceRecg("topic_facerecg_node")
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
(5)打卡记录节点
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class Checkin(Node):
def __init__(self, node_name):
super().__init__(node_name)
self.sub = self.create_subscription(String, "checkin", self.sub_callback, 10)
def sub_callback(self, msg):
self.get_logger().info('%s checked in' % msg.data)
def main(args=None):
rclpy.init(args=args)
node = Checkin("topic_checkin_node")
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()