Skip to the content.

ASV NODE

Este es un esquemático del funcionamiento del código de navegación y utilización de los sensores :

Esquematico del codigo de ASV

A continuación, se detalla una explicacion mas detallada del codigo de navegacion y utilizacion de los sensores:

En este fragmento de código se encarga de inicializar parámetros, declarar servicios y declarar temas para el nodo asv_node:

    def initialize_parameters(self):

        # Get parameters from ROSPARAM
        self.declare_parameter('debug', True)
        self.get_parameter('debug').get_parameter_value().bool_value
        self.declare_parameter('use_path_planner', True)
        self.use_path_planner = self.get_parameter('use_path_planner').get_parameter_value().bool_value

    def declare_services(self):

        # This node connects to the following services:
        # 1 - /mavros/set_mode mavros_msgs/srv/SetMode
        self.set_mode_point_client = self.create_client(SetMode, '/mavros/set_mode')
        # 2 - /path_planner_service asv_interfaces/srv/PathPlanner
        self.path_planner_client = self.create_client(PathPlanner, '/ASV/path_planner_service')
        # 3 - /mavros/mission/clear mavros_msgs/srv/WaypointClear
        self.wp_clear_client = self.create_client(WaypointClear, '/mavros/mission/clear')
        # 4 - /mavros/mission/push mavros_msgs/srv/WaypointPush
        self.wp_push_client = self.create_client(WaypointPush, '/mavros/mission/push')

    def declare_topics(self):

        qos_profile = QoSProfile(
            reliability=QoSReliabilityPolicy.RELIABLE,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=1
        )

        qos_profile_BEF = QoSProfile(
            reliability=QoSReliabilityPolicy.BEST_EFFORT,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=1
        )

        # Subscriptions
        # Subscribe to the state of the vehicle /mavros/state
        self.state_subscriber = self.create_subscription(State, '/mavros/state', self.state_topic_callback, qos_profile)
        # Subscribe to the WP reached topic /mavros/mission/reached
        self.wp_reached_subscriber = self.create_subscription(WaypointReached, '/mavros/mission/reached', self.wp_reached_topic_callback, qos_profile)
        # Subscribe to the battery topic /mavros/battery
        self.battery_subscriber = self.create_subscription(BatteryState, '/mavros/battery', self.battery_topic_callback, qos_profile_BEF)
        # Subscribe to the start_asv topic /start_asv
        self.start_asv_subscriber = self.create_subscription(Bool, '/start_asv', self.start_asv_callback, qos_profile)
        # Subscribe to the wp_target topic /wp_target (FROM MQTT)
        self.wp_target_subscriber = self.create_subscription(GlobalPositionTarget, '/wp_target', self.wp_target_callback, qos_profile)
        # Subscribe to the wp_clear topic /wp_clear (FROM MQTT)
        self.wp_clear_subscriber = self.create_subscription(Bool, '/wp_clear', self.wp_clear_callback, qos_profile)
        # Subscribe to the asv position topic /mavros/global_position/global
        self.asv_position_subscriber = self.create_subscription(NavSatFix, '/mavros/global_position/global', self.asv_position_callback, qos_profile_BEF)

Cada suscripción está asociada con una función de devolución de llamada que se ejecutará cuando se reciban nuevos mensajes en los temas respectivos. Además, se definen perfiles de calidad de servicio (QoS) para comunicación confiable y de mejor esfuerzo.

En este siguiente fragmento del código se definen varios métodos de devolución de llamada (callback) que se activarán cuando se actualicen ciertos temas en el nodo ASV. Además, en el método init, inicializa varios atributos y objetos necesarios para el funcionamiento del nodo.


    def asv_position_callback(self, msg):
        # This function is called when the asv position topic is updated
        #self.get_logger().info(f"ASV position: {msg.latitude}, {msg.longitude}")
        self.asv_position = msg

    def state_topic_callback(self, msg):
        # This function is called when the state topic is updated
        # self.get_logger().info(f"State MODE: {msg.mode}")
        self.asv_mode = msg.mode
        self.asv_armed = msg.armed

    def wp_reached_topic_callback(self, msg):
        # This function is called when the WP is reached
        self.get_logger().info(f"WP reached: {msg.wp_seq}")

        if msg.wp_seq == self.mission_length - 1:
            self.wp_reached = True
            self.get_logger().info(f"Last WP reached!")

    def battery_topic_callback(self, msg):
        # This function is called when the battery topic is updated
        self.battery = msg.voltage

    def start_asv_callback(self, msg):
        # This function is called when the start_asv topic is updated
        self.get_logger().info(f"ASV has been called to start!")
        self.start_asv = True
    
    def wp_target_callback(self, msg):
        # This function is called when the wp_target topic is updated
        self.get_logger().info(f"New target WP received: {msg.latitude}, {msg.longitude}")

        self.wp_queue.put(msg, block=False) # Put the WP in the queue without blocking the main thread

    def wp_clear_callback(self, msg):
        # Just put the flag to msg #
        self.get_logger().info(f"WP cleared!")
        self.wp_cleared = True


    def __init__(self):
        super().__init__('asv_node')
        
        # Declare some parameters #
        self.initialize_parameters()

        # Initialise the mission state
        self.wp_reached = False
        self.wp_cleared = False
        self.start_asv = False
        
        # The WP Queue
        self.wp_queue = queue.Queue() # To store the WPs
        self.FSM_STATE = "IDLE"
        self.asv_mode = "MANUAL"
        self.asv_armed = False

A continuación llegamos al main loop del nodo asv:

    def main_loop(self):
            # This is the main loop of the ASV node # 
            # It is basically a FMS

            # If the mode is in manual, to IDLE
            if not self.asv_mode in ['GUIDED','AUTO'] or not self.asv_armed:
                
                if self.counter % 50 == 0:
                    self.get_logger().warning(f"We are in {self.asv_mode} mode, and ASV is {'armed' if self.asv_armed else 'not armed'}. Going to IDLE.")
                
                self.FSM_STATE = 'IDLE'
                self.mission_length = 0 # Reset the mission length

            # If the WP is cleared, go to IDLE #
            if self.wp_cleared:
                
                self.FSM_STATE = 'WAIT_WP'
                self.wp_cleared = False
                self.wp_queue = queue.Queue() # Clear the queue
                self.mission_length = 0 # Reset the mission length

                # Call the clear service - Sincronously #
                future = self.wp_clear_client.call_async(WaypointClear.Request())
                # Wait for the response #
                rclpy.spin_until_future_complete(self, future)
                if future.result().success:
                    self.get_logger().info(f"WP cleared!")

                self.set_mode('GUIDED') # Change the mode to GUIDED
            
            # THE FSM STATES #

            if self.FSM_STATE == 'IDLE':
                # Wait HERE until the ASV is called to start #
                if self.start_asv:
                    self.get_logger().info(f"FSM to WAIT_WP state")
                    self.FSM_STATE = 'WAIT_WP'
                    self.start_asv = False  # Deactivate the flag

            elif self.FSM_STATE == 'WAIT_WP':
                # Wait HERe until a WP is received #

                # If the path planner is used, unpack the path into the WP queue #
        
                if self.wp_queue.qsize() > 0:

                    # If the path queue is not empty, get the objective #
                    wp_objective = self.wp_queue.get()

                    # 1) Configure the request #

                    if self.use_path_planner:
                        new_objective = PathPlanner.Request()
                        new_objective.origin.lat = self.asv_position.latitude
                        new_objective.origin.lon = self.asv_position.longitude
                        new_objective.destination.lat = wp_objective.latitude
                        new_objective.destination.lon = wp_objective.longitude

                        # 2) Call the path planner #
                        future = self.path_planner_client.call_async(new_objective)

                        # Wait for the response #
                        rclpy.spin_until_future_complete(self, future)
                        self.get_logger().info(f"Path planner answered")

                        # Get the response #
                        response = future.result()

                        if not response.success:
                            self.get_logger().info(f"Path planner was not successful.")
                            return
                    
                        final_path = response.path.path
                
                    else:
                        # If the path planner is not used, just put the WP as the only WP in the list#
                        final_path = [wp_objective]

                    # Conform a list of WPs #
                    wp_list = []
                    mission_request = WaypointPush.Request()
                    for point in final_path:
                        wp = Waypoint()
                        wp.x_lat = point.lat
                        wp.y_long = point.lon
                        wp.autocontinue = True
                        wp.command = 16
                        wp.frame = 0 # Global frame
                        wp_list.append(wp)
                    
                    mission_request.waypoints = wp_list
                    mission_request.start_index = 0

                    # Push the list of WPs to the vehicle #
                    future = self.wp_push_client.call_async(mission_request)

                    # spin until results
                    rclpy.spin_until_future_complete(self, future)

                    if future.result().success:
                        self.get_logger().info(f"New path injected!")
                    else:
                        self.get_logger().info(f"Could not inject the mission.")
                        return
                    
                    # Get the length of the mission #
                    self.mission_length = len(wp_list)
                    
                    # Change the state #
                    self.FSM_STATE = 'GO_TO_WP'

                else:
                    self.FSM_STATE = 'WAIT_WP'

            elif self.FSM_STATE == 'GO_TO_WP':
                # Initialize the mission #
                self.set_mode('AUTO')  # Set the mode to AUTO
                self.get_logger().info(f"Starting mission!")

                # Change the state #
                self.FSM_STATE = 'WAIT_TO_REACH'

            elif self.FSM_STATE == 'WAIT_TO_REACH':
                # Wait until the WP is reached or until the point is cleared #

                if self.wp_reached:

                    self.FSM_STATE = 'WAIT_WP'
                    self.wp_reached = False
                    self.mission_length = 0 # Reset the mission length
                    # Clear the mission #
                    future = self.wp_clear_client.call_async(WaypointClear.Request())
                    # Wait for the response #
                    rclpy.spin_until_future_complete(self, future)
                    # Change the mode to GUIDED #
                    self.set_mode('GUIDED')

                else:
                    self.FSM_STATE = 'WAIT_TO_REACH'
                

        def set_mode(self, mode):
            # This function changes the mode of the vehicle #

            # Create the request #
            request = SetMode.Request()
            request.custom_mode = mode

            # Send the request #
            future = self.set_mode_point_client.call_async(request)

            # Wait for the response #
            rclpy.spin_until_future_complete(self, future)

            if future.result():
                self.get_logger().info(f"Mode changed to {mode}")

            return future



    def main(args=None):
        #init ROS2
        rclpy.init(args=args)
        
        #start a class that servers the services
        asv_node = ASV_node()
        asv_node.destroy_node()

        #after close connection shut down ROS2
        rclpy.shutdown()


    if __name__ == '__main__':
        main()

Este fragmento de código implementa el bucle principal del nodo ASV (Vehículo Submarino Autónomo). El bucle principal se encarga de manejar el estado del vehículo y las tareas asociadas con el seguimiento de los puntos de ruta (WP) y la ejecución de la misión. Aquí está la explicación detallada del código:

def main_loop(self):: Este método representa el bucle principal del nodo ASV. Esencialmente, implementa una Máquina de Estados Finitos (FSM) que controla el comportamiento del vehículo. Si el modo es manual, ir a IDLE: Si el ASV no está en modo “GUIDED” o “AUTO” o si no está armado, el ASV se coloca en estado “IDLE”. Esto significa que el ASV no está activamente ejecutando ninguna misión y está en espera.

Si se ha borrado el WP, ir a IDLE: Si el WP ha sido borrado (indicado por self.wp_cleared), el ASV se coloca en estado “WAIT_WP”, lo que significa que está esperando a recibir un nuevo WP.

Estados de la MEF: Luego, se evalúa el estado actual de la FSM y se ejecuta el comportamiento correspondiente para cada estado. IDLE: En este estado, el ASV espera hasta que se inicie la misión (start_asv).

set_mode(self, mode): Este método cambia el modo de vuelo del vehículo. Envía una solicitud al servicio set_mode con el nuevo modo especificado. Espera la respuesta y, si se cambia el modo correctamente, registra el cambio en el registro de eventos. main(args=None): Esta es la función principal que inicializa el entorno ROS 2, crea una instancia del nodo ASV, ejecuta el bucle principal y, finalmente, cierra el entorno ROS 2 después de que se cierre la conexión.

Path Planner Node

Esta parte del código define una clase PathPlannerNode que representa un nodo en ROS 2 para planificar rutas. Primero declararemos los parametros,servicios y topics que se vayan a uasr en el nodo:

	def parameters(self):

		# Timeout to compute a path
		self.declare_parameter('path_planner_timeout', 20.0)
		self.path_planner_timeout = self.get_parameter('path_planner_timeout').get_parameter_value().double_value	
		# Base obstacle map path
		self.declare_parameter('navigation_map_path', 'mapas/Alamillo95x216')
		self.navigation_map_path = self.get_parameter('navigation_map_path').get_parameter_value().string_value
		# Debug mode
		self.declare_parameter('debug', True)
		self.debug = self.get_parameter('debug').get_parameter_value().bool_value

		# binary dilation
		self.declare_parameter('binary_dilation', 2)
		self.binary_dilation_size = self.get_parameter('binary_dilation').get_parameter_value().integer_value

		# Load the map

		self.navigation_map = np.genfromtxt(self.navigation_map_path + 'plantilla.csv', dtype=int, delimiter=' ')
		if self.binary_dilation_size > 0:
			self.dilated_navigation_map = binary_dilation(self.navigation_map, np.ones((self.binary_dilation_size, self.binary_dilation_size)))
		else:
			self.dilated_navigation_map = self.navigation_map

		grid_xy_to_lat_long = np.genfromtxt(self.navigation_map_path + 'grid.csv', delimiter=';', dtype=str)

		self.navigation_map_lat_long = np.zeros((2, *self.navigation_map.shape))

		# Transform the map to a float for long and lat
		for i in range(self.navigation_map.shape[0]):
			for j in range(self.navigation_map.shape[1]):
				self.navigation_map_lat_long[0, i, j] = float(grid_xy_to_lat_long[i, j].split(',')[0])
				self.navigation_map_lat_long[1, i, j] = float(grid_xy_to_lat_long[i, j].split(',')[1])
		
		# print(self.navigation_map)
					

	def declare_services(self):
		
		# Create a service to compute a path
		self.path_planner_service = self.create_service(PathPlanner, 'path_planner_service', self.path_planner_callback)

	def declare_topics(self):

		# Subscribe to a topic to receive new obstacles
		self.obstacles_subscriber = self.create_subscription(Location, 'detected_obstacles', self.obstacles_callback, 10)

A continuación vemos el resto del código:


	def __init__(self):
		super().__init__('path_planner_node')

		# Declare parameters, services and topics
		self.parameters()

		# Declare the dijkstra object
		self.dijkstra = Dijkstra(self.dilated_navigation_map, 1)

		self.declare_services()	
		self.declare_topics()
		


	def obstacles_callback(self, msg):

		# Take the lat long and search the closest point in the map
		lat = msg.lat
		long = msg.lon

		x_pos, y_pos = self.lat_long_to_xy(lat, long)

		self.get_logger().info('New obstacle updated at X: %d, Y: %d', x_pos, y_pos)

		# Update the map
		self.navigation_map[x_pos, y_pos] = 1

	def lat_long_to_xy(self, lat, long):

		# Find the closest point in the map
		print(lat)
		distance_lat = np.abs(self.navigation_map_lat_long[0, :, :] - lat)
		distance_long = np.abs(self.navigation_map_lat_long[1, :, :] - long)

		# Find position of minimum distance
		position_lat = np.argmin(distance_lat).astype(int)
		position_lat = np.unravel_index(position_lat, self.navigation_map.shape)[0]

		position_long = np.argmin(distance_long).astype(int)
		position_long = np.unravel_index(position_long, self.navigation_map.shape)[1]

		return position_lat, position_long
	
	def xy_to_lat_long(self, x, y):

		return self.navigation_map_lat_long[0, x, y], self.navigation_map_lat_long[1, x, y]

	def path_planner_callback(self, request, response):

		# Compute the path
		
		start = (request.origin.lat, request.origin.lon)
		goal = (request.destination.lat, request.destination.lon)

	
		start_xy = self.lat_long_to_xy(start[0], start[1])
		goal_xy = self.lat_long_to_xy(goal[0], goal[1])

		self.get_logger().info('New objetive updated: START: {}, GOAL: {}'.format(start_xy, goal_xy))

		response = PathPlanner.Response()
		solution = self.dijkstra.planning(start_xy, goal_xy, self.path_planner_timeout)

		self.get_logger().info('Solution obtained!')

		if solution is None:
			response.success = False
		else:

			response.success = True
			response.path.path = []
			solution = reduce_path(solution, self.dilated_navigation_map).astype(int)

			for point in solution:
				lat, long = self.xy_to_lat_long(point[0], point[1])
				response.path.path.append(Location(lat=lat, lon=long))
			
			response.path.path_length = len(response.path.path)
			
		return response	






def main(args=None):
	rclpy.init(args=args)
	node = PathPlannerNode()
	rclpy.spin(node)
	rclpy.shutdown()

if __name__ == '__main__':
	main()


Función main(args=None): Esta función es la función principal del script. Inicializa el nodo PathPlannerNode, lo hace girar y luego apaga el sistema de ROS 2 cuando termina.

En resumen, este código define un nodo en ROS 2 que planifica rutas utilizando el algoritmo de Dijkstra y responde a los cambios en los obstáculos detectados en el entorno.

MQQT COMUNICATION

Esta parte del código define la clase ServerCommunicationNode, que representa un nodo en ROS 2 responsable de la comunicación con un servidor externo y el manejo de diversos mensajes y servicios. Aquí está la explicación del código:


    def initialize_parameters(self):

        # Declare some parameters #
        self.declare_parameter('internet_loss_timeout', 30.0)
        self.internet_loss_timeout = self.get_parameter('internet_loss_timeout').get_parameter_value().integer_value
        self.declare_parameter('mqtt_addr', "adress")
        self.mqtt_addr = self.get_parameter('mqtt_addr').get_parameter_value().string_value
        self.declare_parameter('mqtt_user', "user")
        self.vehicle_id = get_asv_identity()
        self.mqtt_user = 'asv' + str(get_asv_identity())
        self.declare_parameter('mqtt_password', "password")
        self.mqtt_password = self.get_parameter('mqtt_password').get_parameter_value().string_value

    def declare_topics(self):

        # This node connects to the following topics
        # 1) The state of the vehicle /mavros/state
        # 2) The battery /mavros/battery
        # 3) The start_asv topic /start_asv
        # 4) The wp_target topic /wp_target
        # 5) The position of the ASV /asv_position
        # 6) The wp_clear topic /wp_clear

        qos_profile = QoSProfile(
            reliability=QoSReliabilityPolicy.RELIABLE,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=1
        )

        qos_profile_BEF = QoSProfile(
            reliability=QoSReliabilityPolicy.BEST_EFFORT,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=1
        )
        qos_profile_REL=rclpy.qos.QoSProfile(
			depth=1,
			reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE,
			durability=rclpy.qos.QoSDurabilityPolicy.VOLATILE,
			history=rclpy.qos.QoSHistoryPolicy.KEEP_LAST
			)

        # Subscriptions
        self.asv_state_subscription = self.create_subscription(State, '/mavros/state', self.asv_state_callback, qos_profile)
        self.asv_battery_subscription = self.create_subscription(BatteryState, '/mavros/battery', self.asv_battery_callback, qos_profile_BEF)
        self.asv_position_subscription = self.create_subscription(NavSatFix, '/mavros/global_position/global', self.asv_position_callback, qos_profile_BEF)
        self.asv_orientation_subscription = self.create_subscription(PoseStamped, '/mavros/local_position/pose', self.asv_orientation_callback, qos_profile_BEF)

        # Subscriptions to the sensors
        self.wqp_sensor_subscription = self.create_subscription(SensorMsg, '/wqp_measurements', self.wqp_sensor_callback, qos_profile_BEF)
        # Subscriptions to the sonar
        self.sonar_sensor_subscription = self.create_subscription(SonarMsg, '/sonar_measurements', self.sonar_sensor_callback, qos_profile_BEF)
        # Subscriptions to trash point
        self.trash_point_subscription = self.create_subscription(TrashMsg, '/zed2i_trash_detections/trash_localization', self.trash_point_callback, qos_profile_REL)

        # Publications
        self.start_asv_publisher = self.create_publisher(Bool, '/start_asv', qos_profile)
        self.wp_target_publisher = self.create_publisher(GlobalPositionTarget, '/wp_target', qos_profile)
        self.wp_clear_publisher = self.create_publisher(Bool, '/wp_clear', qos_profile)

    def declare_services(self):
        """ Services that this node offers and subscribes to."""

        # Service to change the mode of the ASV
        self.set_mode_srv = self.create_client(SetMode, '/mavros/set_mode')
        self.camera_recording_client = self.create_client(StartSvoRec, '/zed/zed_node/start_svo_rec')
        self.camera_stop_recording_client = self.create_client(Trigger, '/zed/zed_node/stop_svo_rec')


A continuación veremos el resto del codigo:



    def __init__(self):
        super().__init__('communication_node')

        # Get parameters
        self.initialize_parameters()
        # Declare subscribers
        self.declare_topics()
        # Declare services
        self.declare_services()

        self.topic_identity = 'asv/asv' + str(self.vehicle_id)

        self.asv_mode = "MANUAL"
        self.battery = -1
        self.asv_position = {'latitude': 0, 'longitude': 0, 'heading': 0}


        # Declare MQTT topics
        topics = [self.topic_identity + '/start_asv', 
                self.topic_identity + '/wp_clear', 
                self.topic_identity + '/wp_target',
                self.topic_identity + '/asv_mode',
                self.topic_identity + '/camerarecord_on',
                self.topic_identity + '/camerarecord_off']
        
        for topic in topics:
            self.get_logger().info(f"Subscribing to {topic}")

        try:
            self.mqttConnection = MQTT(name=self.topic_identity, 
                                        addr=self.mqtt_addr, 
                                        user=self.mqtt_user, 
                                        password=self.mqtt_password, 
                                        on_message=self.on_message, 
                                        on_disconnect=self.on_disconnect,
                                        topics2suscribe=topics
                                        )
        except ConnectionRefusedError:
            self.get_logger().error(f"Connection to MQTT server was refused")
            self.get_logger().fatal("MQTT module is dead")
            self.destroy_node()
        except OSError:
            self.get_logger().error(f"MQTT server was not found")
            self.get_logger().fatal("MQTT module is dead")
            self.destroy_node()
        except TimeoutError:
            self.get_logger().error(f"MQTT was busy, timeout error")
            self.get_logger().fatal("MQTT module is dead")
            self.destroy_node()
        except:
            error = traceback.format_exc()
            self.get_logger().fatal(f"MQTT connection failed, unknown error:\n {error}")
            self.get_logger().fatal("MQTT module is dead")
            self.destroy_node()

        # Declare timer for publishing data
        self.publishing_timer = self.create_timer(0.5, self.pub_timer_callback)

        # Spin forever
        while rclpy.ok():
            rclpy.spin_once(self)
            sleep(0.1)


    def pub_timer_callback(self):
        # When the timer is triggered, publish the data

        # Publish the complete state of the ASV
        self.mqttConnection.send_new_msg(json.dumps({
            'mode': self.asv_mode,
            'battery': self.battery,
            'Latitude': self.asv_position['latitude'],
            'Longitude': self.asv_position['longitude'],
            'Heading': self.asv_position['heading'],
            'veh_num': self.vehicle_id,
            'date': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }), topic = self.topic_identity + '/asv_state')

    def on_disconnect(self,  client,  __, _):

        sleep(1)
        self.get_logger().error("connection to server was lost")

        if not ping_google():
            time_without_internet = 0

            while not ping_google():

                self.get_logger().error("no internet connection, waiting for internet",once=True)
                sleep(1)
                time_without_internet += 1

                if time_without_internet >= self.internet_loss_timeout:
                    self.processing == True
                    self.call_msg.asv_mode = 3 #if we waited for too long, change into manual mode

            self.get_logger().info("Internet connection regained")
        else:
            self.get_logger().error("There is internet, unknown reason, retrying to connect to MQTT")

    def on_message(self, _client, _, msg):
        # This function is called when a message is received

        # Get the topic
        topic = msg.topic
        # Get the payload
        payload = msg.payload.decode("utf-8")

        # Check the topic
        if topic == self.topic_identity + '/start_asv':
            # If the topic is /asv_start, send the start command trough the topic
            msg = Bool()
            msg.data = payload == 1
            self.start_asv_publisher.publish(msg)
            self.get_logger().info("Start ASV command received")

        elif topic == self.topic_identity + '/wp_clear':
            # If the topic is /wp_clear, send the clear command trough the topic
            msg = Bool()
            msg.data = payload == 1
            self.wp_clear_publisher.publish(msg)
            self.get_logger().info("Clean WPs command received")

        elif topic == self.topic_identity + '/wp_target':
            # If the topic is /wp_target, send the WP command trough the topic
            # Format the payload to a dict (From json)
            payload = json.loads(payload)
            self.get_logger().info("New WP received: " + str(payload))
            msg = GlobalPositionTarget()
            try:
                msg.latitude = payload['latitude']
                msg.longitude = payload['longitude']
                # Publish the WP
                self.wp_target_publisher.publish(msg)
            except KeyError:
                self.get_logger().error("The payload of the requested WP is not correct")

        elif topic == self.topic_identity + '/asv_mode':
            # Call the service to change the mode of the ASV
            # Check if the mode is correct
            # Create the request
            request = SetMode.Request()
            request.custom_mode = payload
            # Call the service
            self.set_mode_srv.call_async(request)
            self.get_logger().info("Change mode command received: " + payload)
        
        elif topic == self.topic_identity + '/camerarecord_on':
            current_date = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
            # Construct the filename with the current date
            filename = f"/root/CameraRecord/ASV_{current_date}.svo"
            self.message_zed = {
                    'svo_filename': filename,
                    'compression_mode': 2,
                    'target_framerate': 30,
                    'bitrate': 6000
                }
            self.get_logger().info("start record")
            call_service_extern(self, self.camera_recording_client, self.message_zed)
        
        elif topic == self.topic_identity + '/camerarecord_off':
            self.message_stop={}
            self.get_logger().info("stop record")
            call_service_extern(self, self.camera_stop_recording_client, self.message_stop)
        else:
            self.get_logger().error("The topic " + topic + " is not recognized")
            self.get_logger().error("The payload is " + str(payload))

    def asv_state_callback(self, msg):
        # This function is called when the state topic is updated
        self.asv_mode = msg.mode
    
    def asv_battery_callback(self, msg):
        # This function is called when the battery topic is updated
        self.battery = msg.percentage

    def asv_position_callback(self, msg):

        self.asv_position['latitude'] = msg.latitude
        self.asv_position['longitude'] =  msg.longitude

    def asv_orientation_callback(self, msg):
         
         euler = quaternion_to_euler([msg.pose.orientation.x, msg.pose.orientation.y, msg.pose.orientation.z, msg.pose.orientation.w])
        
         self.asv_position['heading'] = euler[2]

    def wqp_sensor_callback(self, msg):
        # This function is called when the wqp_sensor topic is updated
        # Check if the message is correct
        if msg.success:
            # If the message is correct, send the message to the MQTT server

            # Format the json msg
            json_msg = json.dumps({
                'conductivity': msg.conductivity,
                'temperature_ct': msg.temperature_ct,
                'turbidity': msg.turbidity,
                'ph': msg.ph,
                'vbat': msg.vbat,
                'lat': msg.lat,
                'lon': msg.lon,
                'date': msg.date,
                'veh_num': self.vehicle_id
            })

            # Send the message
            self.mqttConnection.send_new_msg(json_msg, topic = '/database/wqp')
        else:
            self.get_logger().error("The message received from the WQP sensor is not correct")

    def sonar_sensor_callback(self, msg):
        # This function is called when the sonar_sensor topic is updated
        if msg.success:
            
            json_msg = json.dumps({
                'measurement': msg.distance,
                'Latitude': msg.lat,
                'Longitude': msg.lon,
                'veh_num': self.vehicle_id,
                'date': msg.date
            })

            self.mqttConnection.send_new_msg(json_msg, topic = '/database/sonar')
        else:
            self.get_logger().error("The message received from the sonar sensor is not correct")

    def trash_point_callback(self, msg):
        # This function is called when the sonar_sensor topic is updated
        if msg.success:
            if not isinstance(msg.lat,(int, float)) or numpy.isnan(msg.lat) :
                msg.lat=0.0
                msg.lon=0.0

            json_msg = json.dumps({
                'Latitude': msg.lat,
                'Longitude': msg.lon,
                'veh_num': self.vehicle_id,
                'date': msg.date
            })

            self.mqttConnection.send_new_msg(json_msg, topic = '/database/trash')
        else:
            self.get_logger().error("The message received from the sonar sensor is not correct")

def main(args=None):
    #init ROS2
    rclpy.init(args=args)

    #start a class that servers the services
    server_comms_node = ServerCommunicationNode()
    server_comms_node.destroy_node()

    #after close connection shut down ROS2
    rclpy.shutdown()


if __name__ == '__main__':
    main()

En resumen, este código implementa un nodo en ROS 2 que facilita la comunicación entre el ASV y un servidor externo a través de MQTT, y procesa mensajes relacionados con el estado del ASV, la batería, la posición, y otros datos de sensores.

SONAR NODE

Este código define una clase llamada Sonar_node, que representa un nodo en ROS 2 encargado de la comunicación con el sensor sonar de BlueRobotics.


    def parameters(self):

        self.declare_parameter('sonar_USB_string', '/dev/SONAR')
        self.sonar_USB_string = self.get_parameter('sonar_USB_string').get_parameter_value().string_value

        self.declare_parameter('sonar_baudrate', 115200)
        self.baudrate = self.get_parameter('sonar_baudrate').get_parameter_value().integer_value

        self.declare_parameter('debug', True)
        self.DEBUG = self.get_parameter('debug').get_parameter_value().bool_value

        self.declare_parameter('sonar_measurement_frequency', 1.0)
        self.sonar_measurement_frequency = self.get_parameter('sonar_measurement_frequency').get_parameter_value().double_value
        

    def declare_topics(self):

        qos_profile_BEF = QoSProfile(
            reliability=QoSReliabilityPolicy.BEST_EFFORT,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=1
        )

        # Create a publisher for the sonar measurements
        self.sonar_publisher = self.create_publisher(SonarMsg, '/sonar_measurements', qos_profile_BEF)
        self.sonar_publisher_timer = self.create_timer(self.sonar_measurement_frequency, self.sonar_publish)
       
        # Create a subscription to the position of the ASV
        self.asv_position_subscription = self.create_subscription(NavSatFix, '/mavros/global_position/global', self.asv_position_callback, qos_profile_BEF)

    def asv_position_callback(self, msg):

        self.position[0] = msg.latitude
        self.position[1] = msg.longitude

    def connect_to_sonar(self):

        connection_trials = 0
        
        while True:
            # Try sonar connection
            self.ping_device = Ping1D()
            self.ping_device.connect_serial(self.sonar_USB_string, self.baudrate)

            if self.ping_device.get_ping_enable:
                self.get_logger().info(f"Sonar connected!")
                return True
                break
            else:
                self.get_logger().info(f"Sonar not connected! Trial: {connection_trials}")
                connection_trials += 1
                time.sleep(1)

            if connection_trials > 10:
                self.get_logger().info(f"Failed to connect to Sonar")
                break
        
        return False

    
    def __init__(self):
        super().__init__("sonar_service")

        self.parameters()

        self.declare_topics()

        self.sonar_msg = SonarMsg()

        self.position = [0.0, 0.0]

        self.number_of_bad_readings = 0
    

        if self.DEBUG:
            self.get_logger().info(f"Simulating sonar measurements")

        if  not self.DEBUG:

            if self.connect_to_sonar():
                self.get_logger().info(f"Sonar connected")
            else:
                self.get_logger().info(f"Sonar not connected")
                self.free_resources()
                sys.exit(1)
            

    def destroy_usb(self):
        if self.ping_device:
            self.ping_device.close()

    def generate_fake_data(self):

        displaced_lat = self.position[0] - 37.418691117644244
        displaced_long = self.position[1] - 6.001191255201682
        interval_lat = abs(37.418716586727506 - 37.418691117644244)
        interval_long = abs(6.001191255201682 - 6.0007770870275252)
        self.sonar_msg.success = True
        self.sonar_msg.distance = 30 * cos(2*3.141592 * displaced_lat / interval_lat) + 30 * sin(2*3.141592 * displaced_long / interval_long) 
        self.sonar_msg.confidence = 1.0
        self.sonar_msg.lat = self.position[0]
        self.sonar_msg.lon = self.position[1]
        self.sonar_msg.date = datetime.now().strftime("%d/%m/%Y %H:%M:%S")

        return self.sonar_msg

    def sonar_publish(self):

        if not self.DEBUG:

            if self.ping_device and self.ping_device.get_ping_enable: #Si estamos concetados realizamos el checkeo

                data = self.ping_device.get_distance()

                if data is None:

                    self.sonar_msg.distance = -1.0
                    self.sonar_msg.confidence = -1.0
                    self.sonar_msg.success = False
                    self.get_logger().info(f"Bad reading from sonar - None Received")
                    self.number_of_bad_readings += 1

                    if self.number_of_bad_readings > 10:
                        self.get_logger().info("Sonar not working - Trying to reconnect")
                        self.ping_device.close()
                        self.connect_to_sonar()
                else:
                    # Hooraay! We have a good reading

                    self.number_of_bad_readings = 0
                    self.sonar_msg.distance = float(data["distance"])
                    self.sonar_msg.confidence = float(data["confidence"])
                    self.sonar_msg.success = True

            self.sonar_msg.lat = self.position[0]
            self.sonar_msg.lon = self.position[1]
            self.sonar_msg.date = datetime.now().strftime("%d/%m/%Y %H:%M:%S")


        else:
            # We are simulating the sonar
            self.sonar_msg = self.generate_fake_data()
        
        # Send the message
        self.sonar_publisher.publish(self.sonar_msg)
    

    def free_resources(self):
        """ Free the serial port resources """
        self.destroy_usb()

def main(args=None):
    #init ROS2
    rclpy.init(args=args)
    
    sonar_node = Sonar_node()
    rclpy.spin(sonar_node)

    # Destroy the node explicitly
    sonar_node.free_resources()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

SENSOR NODE

    
    #his functions defines and assigns value to the
    def parameters(self):


        self.declare_parameter('debug', False)
        self.DEBUG = self.get_parameter('debug').get_parameter_value().bool_value

        self.declare_parameter('wqp_sensor_USB_string', "/dev/SENSOR")
        self.USB_string = self.get_parameter('wqp_sensor_USB_string').get_parameter_value().string_value

        self.declare_parameter('wqp_sensor_baudrate', 115200)
        self.baudrate = self.get_parameter('wqp_sensor_baudrate').get_parameter_value().integer_value

        self.declare_parameter('wqp_sensor_timeout', 10.0)
        self.timeout = self.get_parameter('wqp_sensor_timeout').get_parameter_value().double_value

        self.declare_parameter('wqp_sensor_measurement_frequency', 1.0)
        self.measurement_frequency = self.get_parameter('wqp_sensor_measurement_frequency').get_parameter_value().double_value
    
    def declare_topics(self):
        
        qos_profile_BEF = QoSProfile(
            reliability=QoSReliabilityPolicy.BEST_EFFORT,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=1
        )

         # Create a subscription to the position of the ASV
        self.asv_position_subscription = self.create_subscription(NavSatFix, '/mavros/global_position/global', self.asv_position_callback, qos_profile_BEF)

        # Create a publisher for the sensor measurements
        self.sensor_publisher = self.create_publisher(SensorMsg, '/wqp_measurements', qos_profile_BEF)
        self.sensor_publisher_timer = self.create_timer(self.measurement_frequency, self.sensor_publish)
    

    def asv_position_callback(self, msg):
        # Save the position of the ASV
        self.position[0] = msg.latitude
        self.position[1] = msg.longitude

    def generate_fake_data(self):
        # Generate fake data

        sensor_measurement = SensorMsg()

        sensor_measurement.success = True
        sensor_measurement.conductivity = 0.0
        sensor_measurement.temperature_ct = 0.0
        sensor_measurement.turbidity = 0.0
        sensor_measurement.ph = 0.0
        sensor_measurement.vbat = 0.0
        sensor_measurement.lat = self.position[0]
        sensor_measurement.lon = self.position[1]
        sensor_measurement.date = datetime.now().strftime("%d/%m/%Y %H:%M:%S")

        return sensor_measurement

Método parameters(self): Este método se encarga de inicializar los parámetros necesarios para el funcionamiento del módulo. Declara y asigna valores a los parámetros relacionados con la configuración del sensor, como el puerto USB del sensor (wqp_sensor_USB_string), la velocidad de transmisión (wqp_sensor_baudrate), el tiempo de espera (wqp_sensor_timeout), y la frecuencia de medición (wqp_sensor_measurement_frequency). Además, declara el parámetro debug para habilitar o deshabilitar el modo de depuración.

    

    def __init__(self):

        #start the node
        super().__init__('wqp_sensor_node')
        
        # declare parameter of drone IP
        self.parameters()
        self.declare_topics()

        self.sensor_msg = SensorMsg()

        self.position = [0.0, 0.0]

        self.pattern = r'data=([^,]+),([^,\]]+)'

        if self.DEBUG:
            self.get_logger().warning("Debug mode enabled")

        if  not self.DEBUG:
            connection_trials = 0
            while True:
                # Try sensor connection
                self.serial = serial.Serial(self.USB_string,self.baudrate, timeout=10)
                
                if self.serial.open:
                    self.get_logger().info(f"Sensor connected!")
                    break
                else:
                    self.get_logger().info(f"Sensor not connected! Trial: {connection_trials}")
                    connection_trials += 1
                    time.sleep(1)

                if connection_trials > 10:
                    self.get_logger().info(f"Failed to connect to Sensor")
                    break
        
    
    def read_sensor(self):
        
        # Send the sensor a command to take a sample, and read the response
        read_trials = 0
        read_ok = False

        # First, check if the sensor is connected
        if self.serial.is_open:

            # Wait for the response 100ms
            read_data = ""
            time.sleep(0.1)

            # Try 5 times 
            while(read_trials < 5 or not read_ok):
                    
                    # Flush the input buffer
                    self.serial.read_all()
                    
                    # Send the command to the sensor
                    self.serial.write(bytes("mscan\n",'ascii'))

                    # Wait for the response
                    time.sleep(0.1)

                    # Increment the number of trials
                    read_trials += 1
                    
                    # Read the incoming data byte by byte
                    while(self.serial.in_waiting > 0):

                        new_incoming_data = self.serial.read() # Read one byte

                        try:
                            decoded_data = new_incoming_data.decode()
                            new_character = str(decoded_data)
                        except:
                            self.get_logger().debug(f"Cannot decode incomming byte!")
                            continue
                            
                        # Append the new character to the read data
                        read_data += new_character
    
                        if len(read_data) > 0 and '}' in read_data:
                            # We have read the whole message
                            read_ok = True
                            self.get_logger().debug(f"A message has been read from the sensor: {read_data}")
                            break
            

            if read_ok:
                return read_data
            elif read_trials >= 5:
                self.get_logger().info(f"Too many trials to read the sensor!")
                return None
            else:
                return None


    def status_suscriber_callback(self, msg):
        self.status=msg

    def destroy_usb(self):
        if self.serial.open:
            self.serial.close

    def reconnect_sensor(self):

        # Close the serial port
        if self.serial.is_open:
            self.serial.close() # This is important
        
        del self.serial # This is important

        # Try to reconnect
        connection_trials = 0

        while connection_trials < 10:

            # Try sensor connection
            self.serial = serial.Serial(self.USB_string, self.baudrate, timeout=10)
            
            if self.serial.open:
                self.get_logger().info(f"Sensor connected!")
                return True
            else:
                self.get_logger().info(f"Sensor not connected! Trial: {connection_trials}")
                connection_trials += 1
                time.sleep(0.1)

            if connection_trials > 10:
                self.get_logger().info(f"Failed to connect to Sensor")
                return False
    

    def sensor_publish(self):

        if not self.DEBUG:

            data = self.read_sensor()

            # Check if the data is not empty
            if data is None:

                self.get_logger().info(f"Sensor data is empty!")
                # Try to reconnect to the sensor
                reconnect_response = self.reconnect_sensor()

                # If the reconnection is successful, try to read the sensor again
                if reconnect_response:
                    data = self.read_sensor()
                else:
                    self.get_logger().info(f"Failed to reconnect to the sensor!")
                    return

            # Now we have the data, we can parse it
                
            self.get_logger().info(data)

            self.sensor_msg.success=True
            
            # Find all matches of the pattern in the input string
            matches = re.findall(self.pattern, data)
            
            if len(matches) == 0:
                self.get_logger().info(f"No matches found in the sensor data")
                return

            for match in matches:

                sensor_str = match[0]
                sensor_val = match[1]

                if sensor_str == "Cond":
                    #self.get_logger().info(f"Found Conductivity {sensor_val}")
                    self.sensor_msg.conductivity = float(sensor_val)
                if sensor_str == "TempCT":
                    #self.get_logger().info(f"Found Temperature from an CT.X2 sensor {sensor_val}")
                    self.sensor_msg.temperature_ct = float(sensor_val)
                if sensor_str == "Turbidity":
                    #self.get_logger().info(f"Found Turdibity {sensor_val}")
                    self.sensor_msg.turbidity = float(sensor_val)
                if sensor_str == "pH":
                    #self.get_logger().info(f"Found pH value {sensor_val}")
                    self.sensor_msg.ph = float(sensor_val)
                if sensor_str == "vbat":
                    #self.get_logger().info(f"Found battery value {sensor_val}")
                    self.vbat = float(sensor_val)

            self.sensor_msg.vbat = (62.5*(self.vbat)-425)
            self.sensor_msg.lat = self.position[0]
            self.sensor_msg.lon = self.position[1]
            self.sensor_msg.date = datetime.now().strftime("%d/%m/%Y %H:%M:%S")  
         
        else:
            
            # We are simulating the sensor
            self.sensor_msg = self.generate_fake_data()
        

        # Send the message
        self.sensor_publisher.publish(self.sensor_msg)
            

def main(args=None):
    #init ROS2
    rclpy.init(args=args)
    
    sensor_module = WQP_Sensor_module()
    rclpy.spin(sensor_module)

    # Destroy the node explicitly
    sensor_module.destroy_usb()
    rclpy.shutdown()





if __name__ == '__main__':
    main()


Se inicializan los parámetros del sensor mediante el método parameters() y se declaran los topics mediante el método declare_topics().

Se inicializan algunas variables, como sensor_msg para almacenar los datos del sensor, position para almacenar la posición y pattern para definir el patrón de datos del sensor. Se comprueba si el modo de depuración (DEBUG) está habilitado. Si no está en modo de depuración, se intenta conectar al sensor mediante un bucle while.