diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..35ba40b --- /dev/null +++ b/.env.example @@ -0,0 +1 @@ +export UNIFI_PASSWORD="" diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d50a09f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.env +__pycache__/ diff --git a/BBoxProcessor.py b/BBoxProcessor.py new file mode 100644 index 0000000..1739d5b --- /dev/null +++ b/BBoxProcessor.py @@ -0,0 +1,85 @@ +import time +import math + + +class BBoxProcessor: + + def __init__(self): + self.last_boxes = [] + self.last_timestamps = [] + + def get_joystick_position_from_new_set_of_bboxes(self, new_boxes: list) -> list: + """ + Given a set of bounding boxes, returns the joystick position that would move the robot to the closest one. + """ + new_boxes = self.process_next(new_boxes) + closest_box = self.get_box_closest_to_center(new_boxes) + if closest_box is None: + return [0, 0] + return self.get_normalized_box_position(closest_box) + + def bbox_distance(self, box1: list, box2: list) -> float: + """ + Calculates the distance between two bounding boxes. + """ + return math.sqrt( + ((box1[0] + box1[2] / 2) - (box2[0] + box2[2] / 2)) ** 2 + + ((box1[1] + box1[3] / 2) - (box2[1] + box2[3] / 2)) ** 2 + ) + + def process_next(self, new_boxes, dist_threshold=80) -> list: + """ + Receives a reading of bounding boxes and processes them. + It compares previous bounding boxes with the new ones: If a previous box is close enough to an new one, it is considered the same object. + It also checks if previous boxes have outlived their last appearance, and if so, they are not added to the new list. + """ + timestamp = time.time() + time_to_live = 0.5 + + new_boxes_list = [] + new_timestamps_list = [] + for new_box in new_boxes: + new_boxes_list.append(new_box) + new_timestamps_list.append(timestamp) + + min_distance = dist_threshold + for index, box in enumerate(self.last_boxes): + distance = 9999 + closest = None + for new_box in new_boxes: + new_distance = self.bbox_distance(box, new_box) + if new_distance < min_distance and new_distance < distance: + distance = new_distance + closest = new_box + + if ( + closest is None + and timestamp < self.last_timestamps[index] + time_to_live + ): + new_boxes_list.append(box) + new_timestamps_list.append(timestamp) + + self.last_boxes = new_boxes_list + self.last_timestamps = new_timestamps_list + return new_boxes_list + + def get_box_closest_to_center(self, box_list: list) -> list: + """ + Returns the BBox closest to the center (500, 500) + """ + closest = [] + closest_distance = 9999 + for box in box_list: + distance = math.sqrt( + (box[0] + box[2] / 2 - 500) ** 2 + (box[1] + box[3] / 2 - 500) ** 2 + ) + if distance < closest_distance: + closest = box + closest_distance = distance + return closest + + def get_normalized_box_position(self, box: list) -> list: + """ + Returns the normalized position of a bounding box, shringking its range from 0 to 1000 to 0 to 1 + """ + return [(box[0] + box[2] / 2) / 500.0 - 1, (box[1] + box[3] / 2) / 500 - 1] diff --git a/Communication/ModbusTCP.py b/Communication/ModbusTCP.py new file mode 100644 index 0000000..cee7559 --- /dev/null +++ b/Communication/ModbusTCP.py @@ -0,0 +1,266 @@ +import struct +import random + +from Communication.SocketConnection import SocketConnection + +# All MODBUS/TCP ADU are sent via TCP to registered port 502. +# Remark : the different fields are encoded in Big-endian + +# A Modbus frame is composed of an Application Data Unit (ADU), which encloses a Protocol Data Unit (PDU). +# A dedicated header is used on TCP/IP to identify the MODBUS Application Data Unit. +# It is called the MBAP header (MODBUS Application Protocol header) +# +# ADU = MBAP Header + PDU, +# PDU = Function code + Data. + +# Below the components of the Modbus TCP/IP are listed together with their size in bytes: +# +---------------------------------+ +# | Application Data Unit (ADU) | +# +---------------+-----------------+ +# | **Component** | **Size** (bytes)| +# +---------------+-----------------+ +# | MBAP Header | 7 | +# +---------------+-----------------+ +# | PDU | N | +# +---------------+-----------------+ + +# The MBAP header is 7 bytes long and contains the following fields: +# +------------------------+--------------------+--------------------------------------+ +# | Modbus Application Protocol header (MBAP) | +# +------------------------+--------------------+--------------------------------------+ +# | **Field** | **Length** (bytes) | **Description** | +# +------------------------+--------------------+--------------------------------------+ +# | Transaction identifier | 2 | Identification of a | +# | | | Modbus request/response transaction. | +# +------------------------+--------------------+--------------------------------------+ +# | Protocol identifier | 2 | Protocol ID, 0 = Modbus protocol | +# +------------------------+--------------------+--------------------------------------+ +# | Length | 2 | Number of following bytes | +# +------------------------+--------------------+--------------------------------------+ +# | Unit identifier | 1 | Identification of a remote slave | +# | | | connected on a serial line or bus | +# +------------------------+--------------------+--------------------------------------+ + +# The Protocol Data Unit has a variable length and consists of the following fields: +# +------------------------+--------------------+--------------------------------------+ +# | Protocol Data Unit (PDU) | +# +------------------------+--------------------+--------------------------------------+ +# | **Field** | **Length** (bytes) | **Description** | +# +------------------------+--------------------+--------------------------------------+ +# | Function code | 1 | Function codes as in other variants | +# +------------------------+--------------------+--------------------------------------+ +# | Data | n | Data as response or commands | +# +------------------------+--------------------+--------------------------------------+ + +# Response: +# For a normal response, slave repeats the function code. Should a slave want to report an error, +# it will reply with the requested function code plus 128 (hex 0x80) (3 becomes 131 = hex 0x83), +# and will only include one byte of data, known as the exception code. + +# A normal response frame would look like this: +# +----------------------------------+ +# | Response (ADU) | +# +----------------+-----------------+ +# | **Component** | **Size** (bytes)| +# +----------------+-----------------+ +# | MBAP Header | 7 | +# +----------------+-----------------+ +# | Function code | 1 | +# +----------------+-----------------+ +# | Data | n | +# +----------------+-----------------+ + +# An error response frame is 9 bytes long with the following fields +# +----------------------------------+ +# | Response (ADU) (Error) | +# +----------------+-----------------+ +# | **Component** | **Size** (bytes)| +# +----------------+-----------------+ +# | MBAP Header | 7 | +# +----------------+-----------------+ +# | Function code | 1 | +# +----------------+-----------------+ +# | Exception code | 1 | +# +----------------+-----------------+ + +# For more information on modbus: +# http://www.modbus.org +# http://www.modbus.org/docs/Modbus_Application_Protocol_V1_1b.pdf +# http://www.modbus.org/docs/Modbus_Messaging_Implementation_Guide_V1_0b.pdf + + +class ModbusTCP: + """ + A Modbus communication class designed for use with modbusTCP + """ + __version__ = '0.1' + + # Modbus function code + READ_COILS = 0x01 + READ_HOLDING_REGISTERS = 0x03 + + # Todo: Implement the following modbus functionality: + # READ_DISCRETE_INPUTS = 0x02 + # READ_INPUT_REGISTERS = 0x04 + # WRITE_SINGLE_COIL = 0x05 + # WRITE_SINGLE_REGISTER = 0x06 + # WRITE_MULTIPLE_COILS = 0x0F + # WRITE_MULTIPLE_REGISTERS = 0x10 + + # Todo: Implement error checking with the following exception codes + # Modbus exception code + # ILLEGAL_FUNCTION_CODE = 0x01 + # ILLEGAL_DATA_ACCESS = 0x02 # if the request address is illegal + # ILLEGAL_DATA_VALUE = 0x03 # if the request data is invalid + + def __init__(self, host, port=502): + """ + :param host: IP address to connect with + :param port: Pot (standard 502) to connect with + """ + self.__transaction_id = 0 # For synchronization between messages of server and client + self.__protocol_id = 0 # 0 for Modbus/TCP + self.__unit_id = 0 # Slave address (255 if not used) + + self.pretty_print_response = False # Check to print out response message in console + + self.connection = SocketConnection(host, port) + + def open(self): + """ + Open the socket for communication + """ + self.connection.connect() + + def close(self): + """ + Close the socket + """ + self.connection.disconnect() + + def read_coils(self, bit_address, quantity=1): + """ Main function 1 of Modbus/TCP - 0x01 + + :param bit_address: + :param quantity: + :return: + """ + data_bytes = struct.pack(">HH", bit_address, quantity) + message = self._create_message(self.READ_COILS, data_bytes) + return self._send(message) + + def read_holding_registers(self, reg_address, quantity=1): + """Main function 3 of Modbus/TCP - 0x03. + + Reads the values stored in the registers at the specified addresses. + :param reg_address: Address of first register to read (16-bit) specified in bytes. + :param quantity: Number of registers to read (16-bit) specified in bytes + :return: The values stored in the addresses specified in Bytes + """ + data_bytes = struct.pack(">HH", reg_address, quantity) + message = self._create_message(self.READ_HOLDING_REGISTERS, data_bytes) + return self._send(message) + + def _create_message(self, function_code, data_bytes): + """ + Create packet in bytes format for sending. + :param function_code: bytes + :param data_bytes: bytes + :return: Bytes modbus packet + """ + body = struct.pack('>B', function_code) + data_bytes # create PDU + self.__transaction_id = random.randint(0, 65535) + message_length = 1 + len(body) + header = struct.pack(">HHHB", self.__transaction_id, self.__protocol_id, message_length, self.__unit_id) + return header + body + + def _send(self, adu): + """ Send message over the socket + + :param adu: The data to send over the socket + :return: Bytes response from the other end of the socket + """ + self.open() + self.connection.send(adu) + response = self.connection.receive() + self.close() + + if self.pretty_print_response: + self.pretty_print(response) + + if self._error_check(response): + return None + return response + + def _error_check(self, response): + """ Check if the frame is void of errors + + Raises an exception termination the program + :param response: The ADU to check + :return: None + """ + mbap = response[:7] + function_code = response[7:8] + mbap = struct.unpack(">HHHB", mbap) + + if mbap[0] != self.__transaction_id: + print("Modbus: Transaction ID mismatch" + "\n - Send: {} \n - Response: {}".format(self.__transaction_id, mbap[0])) + return True + elif mbap[1] != self.__protocol_id: + print("Modbus: Protocol ID mismatch" + "\n - Send: {} \n - Response: {}".format(self.__protocol_id, mbap[1])) + return True + elif mbap[3] != self.__unit_id: + print("Modbus: Unit ID mismatch" + "\n - Send: {} \n - Response: {}".format(self.__unit_id, mbap[3])) + return True + elif mbap[2] != len(response[6:]): + print("Modbus: Length mismatch" + "\n - Length: {} \n - Remaining: {}".format(mbap[2], len(response[6:]))) + return True + + function_code = struct.unpack(">B", function_code) + if function_code[0] > 127: + error_code = struct.unpack(">B", response[8:9]) + print("Modbus: Function error: {}".format(error_code)) + return True + + return False + + def set_pretty_print(self, value): + """ + Enable or disable printing of response message in console + :param value: Boolean + """ + self.pretty_print_response = value + + @staticmethod + def pretty_print(response): + """ Print Response in the console + + Unpacks the MBAP and function code in readable format + Data bytes + :param response: + """ + mbap = response[:7] + function_code = response[7:8] + mbap = struct.unpack(">HHHB", mbap) + function_code = struct.unpack(">B", function_code) + + print("+--------------------------------------+") + print("| ****Modbus TCP Response (ADU)**** |") + print("+--------------------------------------+") + print("| **Header information (MBAP)** |") + print("+--------------------------------------+") + print("| Transaction id: " + str(mbap[0])) + print("| Protocol id: " + str(mbap[1])) + print("| Length: " + str(mbap[2])) + print("| Unit id: " + str(mbap[3])) + print("+--------------------------------------+") + print("| **Data information (PDU)** |") + print("+--------------------------------------+") + print("| Function code: " + str(function_code[0])) + print("| Data: " + str(response[8:])) + print("+--------------------------------------+") + print("\n") diff --git a/Communication/SocketConnection.py b/Communication/SocketConnection.py new file mode 100644 index 0000000..55faf59 --- /dev/null +++ b/Communication/SocketConnection.py @@ -0,0 +1,69 @@ +import socket + + +class SocketConnection: + """ + Defines a simple interface for connecting to a socket + """ + def __init__(self, host, port): + """ + :param host: The IP to connect with + :param port: Port to connect with + """ + self.host = host + self.port = port + self.opened = False + self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + def connect(self): + """ + Opens a socket connection with the robot for communication. + :return: + """ + if self.opened: + self.disconnect() + self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.s.settimeout(1) + + try: + self.s.connect((self.host, self.port)) + self.opened = True + except OSError as error: + print("Connecting OS error: {0}".format(error)) + return + return self.s + + def send(self, message): + """ + Send data over the socket connection + :param message: The data to send + :return: + """ + total_send = 0 + while total_send < len(message): + send = self.s.send(message[total_send:]) + if send == 0: + raise RuntimeError("socket connection broken") + total_send = total_send + send + self.s.send(message) + + def receive(self): + """ + Recieve data over the socket connection + :return: + """ + response = self.s.recv(1024) + if len(response) == 0: + raise RuntimeError("socket connection broken") + return response + + def disconnect(self): + """ + Closes the socket connection + :return: + """ + try: + self.s.close() + except OSError as error: + print("Disconnecting OS error: {0}".format(error)) + return diff --git a/Robot/UR/URModbusServer.py b/Robot/UR/URModbusServer.py new file mode 100644 index 0000000..c6367a7 --- /dev/null +++ b/Robot/UR/URModbusServer.py @@ -0,0 +1,163 @@ +from Communication.ModbusTCP import ModbusTCP + +import time, math, struct + +# The robot controller acts as a Modbus TCP server (port 502), +# clients can establish connections to it and send standard MODBUS requests to it. + +# - Note that some Modbus device manufacturers use the terms Master (client) and Slave (server). +# Typically the external IO device is going to be a server and the robot is going to behave as the client +# (requesting and consuming messages from the server). Master=client and Slave=server. +# However, note that the UR controller can be both a server and a client + +# - Note that all values are unsigned, if you want to convert to signed integers, +# program "if (val > 32768): val = val - 65535". +# +# - The MODBUS Server has 0-based addressing. +# Be aware that some other devices are 1-based (e.g. Anybus X-gateways), then just add one to the address +# on that device. (e.g. address 3 on the robot will be address 4 on the Anybus X-gateway) + + +class URModbusServer: + """Give read and write access to data in the robot controller for other devices + + An interface for communicating with the modbus TCP server (port 502) on the UR. + Defines functions for retrieving information from the controller. + Information will be re-requested if an error occurs. + All information will be formatted to human readable information. + """ + + def __init__(self, host): + """ + :param host: IP address to connect with + """ + self.modbusTCP = ModbusTCP(host, 502) + + def get_tcp_position(self): + """ + Connects with the Modbus server to requests Cartesian data of the TCP + :return: Readable cartesian data of TCP, vector in mm, axis in radials + """ + packet = self.modbusTCP.read_holding_registers(400, quantity=6) + + if packet is None: + time.sleep(0.5) + print("Modbus Error: retrying") + return self.get_tcp_position() + else: + x = self._format(packet[9:11]) / 10 + y = self._format(packet[11:13]) / 10 + z = self._format(packet[13:15]) / 10 + rx = self._format(packet[15:17]) / 1000 + ry = self._format(packet[17:19]) / 1000 + rz = self._format(packet[19:21]) / 1000 + return x, y, z, rx, ry, rz + + def get_joint_angles(self, tries=0) -> tuple: + """ + Connects with the Modbus server to requests the angles of each joint, in radians + :return: Readable angle values of each joint in radials + """ + if tries>10: + print("Modbus Error: Failed") + return 0, 0, 0, 0, 0, 0 + + packet = self.modbusTCP.read_holding_registers(270, quantity=6) + packet_signs = self.modbusTCP.read_holding_registers(320, quantity=6) + + if (packet is None) or (packet_signs is None): + time.sleep(0.01) + print(f"Modbus Error #{tries}: retrying") + return self.get_joint_angles(tries+1) + else: + #print(f"Packet: {packet_signs.hex()}") + base = self._format_sign(packet[9:11], packet_signs[9:11]) + shoulder = self._format_sign(packet[11:13], packet_signs[11:13]) + elbow = self._format_sign(packet[13:15], packet_signs[13:15]) + wrist_1 = self._format_sign(packet[15:17], packet_signs[15:17]) + wrist_2 = self._format_sign(packet[17:19], packet_signs[17:19]) + wrist_3 = self._format_sign(packet[19:21], packet_signs[19:21]) + + return base, shoulder, elbow, wrist_1, wrist_2, wrist_3 + + def get_joint_angles_degrees(self, tries=0): + angles = self.get_joint_angles(tries) + return tuple([round(math.degrees(angle), 3) for angle in angles]) + + def get_joint_speeds(self, tries=0): + """ + Connects with the Modbus server to requests the speed of each joint, in radians + :return: Readable angle values of each joint in radials + """ + if tries>10: + print("Modbus Error: Failed") + return 0, 0, 0, 0, 0, 0 + + packet = self.modbusTCP.read_holding_registers(280, quantity=6) + + if (packet is None): + time.sleep(0.01) + print(f"Modbus Error #{tries}: retrying") + return self.get_joint_speeds(tries+1) + else: + #print(f"Packet: {packet_signs.hex()}") + base = self._format(packet[9:11]) / 1000 + shoulder = self._format(packet[11:13]) / 1000 + elbow = self._format(packet[13:15]) / 1000 + wrist_1 = self._format(packet[15:17]) / 1000 + wrist_2 = self._format(packet[17:19]) / 1000 + wrist_3 = self._format(packet[19:21]) / 1000 + + return base, shoulder, elbow, wrist_1, wrist_2, wrist_3 + + @staticmethod + def _format(d): + """Formats signed integers to unsigned float + + :param d: signed integer to format + :return: unsigned float + """ + d = d.hex() + d_i = int(d, 16) + d_f = 0 + + if d_i < 32768: + d_f = float(d_i) + if d_i > 32767: + d_i = 65535 - d_i + d_f = float(d_i) * -1 + + return d_f + + @staticmethod + def _format_2(d): + """Formats signed integers to signed float + + :param d: signed integer to format + :return: signed float + """ + d = d.hex() + d_i = int(d, 16) + d_f = float(d_i) + return d_f + + @staticmethod + def _format_sign(d, sign): + """ + + :param d: + :param sign: + :return: + """ + sign = sign.hex() + sign = int(sign, 16) + + d = d.hex() + d_i = int(d, 16) + d_f = float(d_i) / 1000 + + if sign == 0: + return d_f + else: + d_f = round(d_f - math.radians(360), 3) + return d_f diff --git a/Robot/UR/URRobot.py b/Robot/UR/URRobot.py new file mode 100644 index 0000000..1e71823 --- /dev/null +++ b/Robot/UR/URRobot.py @@ -0,0 +1,160 @@ +from Communication.SocketConnection import SocketConnection +from Robot.UR.URModbusServer import URModbusServer +from Robot.UR.URScript import URScript + + +class URRobot: + """ + Interface for communicating with the UR Robot + SecondaryPort used for sending commands + ModbusServer used for retrieving info + """ + def __init__(self, host): + self.secondaryPort = 30002 + self.secondaryInterface = SocketConnection(host, self.secondaryPort) + self.secondaryInterface.connect() + self.URModbusServer = URModbusServer(host) + self.URScript = URScript() + + # Max safe values of acceleration and velocity are 0.4 + # DO NOT USE THE FOLLOWING VALUES + # MAX a=1.3962634015954636 + # MAX v=1.0471975511965976 + # These are the absolute max for 'safe' movement and should only be used + # if you know what your are doing, otherwise you WILL damage the UR internally + self.acceleration = 0.1 + self.velocity = 0.1 + + def movel(self, pose, a=0.1, v=0.1, joint_p=False): + """Move to position (linear in tool-space) + + See :class:`URScript` for detailed information + """ + script = URScript.movel(pose, a, v, joint_p=joint_p).encode() + return self._send_script(script) + + def movej(self, q, a=0.1, v=0.1, joint_p=True): + """Move to position (linear in joint-space) + + See :class:`URScript` for detailed information + """ + script = URScript.movej(q, a, v, joint_p=joint_p).encode() + return self._send_script(script) + + def speedj(self, qd, a=0.5, t=0): + """Accelerate linearly in joint space and continue with constant joint speed. + + The time t is optional; if provided the function will return after time t, regardless of the target speed has been reached. + If thetime t is not provided, the function will return when the target speed is reached. + + See :class:`URScript` for detailed information + """ + script = URScript.speedj(qd, a, t).encode() + return self._send_script(script) + + def stopj(self, a=1.5): + """Stop (linear in joint space) + + See :class:`URScript` for detailed information + """ + script = URScript.stopj(a).encode() + return self._send_script(script) + + def set_tcp(self, pose): + """Set the Tool Center Point + + See :class:`URScript` for detailed information + """ + script = URScript.set_tcp(pose).encode() + return self._send_script(script) + + def get_tcp_position(self): + """ Get TCP position + + Will return values as seen on the teaching pendant (300.0mm) + :return: 6 Floats - Position data of TCP (x, y, z) in mm (Rx, Ry, Rz) in radials + """ + position_data = self.URModbusServer.get_tcp_position() + return position_data + + def get_joint_angles(self): + """ Get joint angles + + :return: 6 Floats - Position joint angles (base, shoulder, elbow, wrist_1, wrist_2, wrist_3) in radians + """ + position_data = self.URModbusServer.get_joint_angles() + return position_data + + def get_joint_angles_degrees(self): + """ Get joint angles in degrees + + :return: 6 Floats - Position joint angles (base, shoulder, elbow, wrist_1, wrist_2, wrist_3) in degrees + """ + position_data = self.URModbusServer.get_joint_angles_degrees() + return position_data + + def get_joint_speeds(self): + """ Get joint speeds in rads/s + + :return: 6 Floats - Position joint speeds (base, shoulder, elbow, wrist_1, wrist_2, wrist_3) in rads/s + """ + speed_data = self.URModbusServer.get_joint_speeds() + return speed_data + + def set_io(self, io, value): + """ + Set the specified IO + :param io: The IO to set as INT + :param value: Boolean to enable or disable IO + :return: Boolean to check if the command has been send + """ + script = "set_digital_out({}, {})".format(io, value) + "\n" + script = script.encode() + return self._send_script(script) + + def translate(self, vector, a=0.1, v=0.1): + """ Move TCP based on its current position + + Example: + If the TCP position was: 0.4, 0.5, 0.5 and the vector value passed is 0.1, 0.0, 0.0. + It will attempt to translate the TCP to 0.5, 0.5, 0.5 + :param vector: the X, Y, Z to translate to + :param a: tool acceleration [m/2^s] + :param v: tool speed [m/s] + :return: Boolean to check if the command has been send + """ + tcp_pos = list(self.get_tcp_position()) + tcp_pos[0] = tcp_pos[0] / 1000 + vector[0] + tcp_pos[1] = tcp_pos[1] / 1000 + vector[1] + tcp_pos[2] = tcp_pos[2] / 1000 + vector[2] + return self.movel(tcp_pos, a, v) + + def _send_script(self, _script): + """ Send URScript to the UR controller + + :param _script: formatted script to send + :return: Boolean to check if the script has been send + """ + try: + self.secondaryInterface.send(_script) + except OSError as error: + print("OS error: {0}".format(error)) + return False + return True + + @ staticmethod + def format_cartesian_data(cartesian_data): + """ + Formats the vector from human readable mm to robot readable mm + A position of 300mm becomes 0.3 + :param cartesian_data: data to format + :return: formatted data + """ + i = 0 + for position in cartesian_data: + cartesian_data[position] /= (1000 if i < 3 else 1) + i += 1 + formatted_cartesian_data = cartesian_data + + return formatted_cartesian_data + diff --git a/Robot/UR/URScript.py b/Robot/UR/URScript.py new file mode 100644 index 0000000..5b246a5 --- /dev/null +++ b/Robot/UR/URScript.py @@ -0,0 +1,142 @@ +# SW Version 1.8 - URScript manual: +# When connected URScript programs or commands are sent in clear text on the socket. +# Each line is terminated by ’\n’ +# python 3 must use bytes (b"") to send the string + +# When using URScript the robot moves according to the base plane. + +# Joint positions (q) and joint speeds (qd) are represented directly as lists of 6 Floats, one for each robot joint. +# Tool poses (x) are represented as poses also consisting of 6 Floats. +# In a pose, the first 3 coordinates is a position vector and the last 3 an axis-angle + + +class URScript: + """ Defines functions for use with the URRobot. + + Based on URScript manual version 1.8 + """ + __version__ = 1.8 + + @staticmethod + def movec(pose_via, pose_to, a=0.1, v=0.1, r=0, joint_p=False): + """Move Circular: Move to position (circular in tool-space) + + TCP moves on the circular arc segment from current pose, through pose_via to pose_to. + Accelerates to and moves with constant tool speed v + :param pose_via: path point (note: only position is used). + (pose_via can also be specified as joint positions, then forward kinematics is used to + calculate the corresponding pose) + :param pose_to: target pose (pose_to can also be specified as joint positions, then + forward kinematics is used to calculate the corresponding pose + :param a: tool acceleration [m/2^s] + :param v: tool speed [m/s] + :param r: blend radius (of target pose) [m] + :param joint_p: if True, poses are specified as joint positions + :return: string containing the movec script + """ + prefix = "" if joint_p else "p" + return"movec({}[{}, {}, {}, {}, {}, {}], {}[{}, {}, {}, {}, {}, {}], " \ + "a={}, v={}, r={})".format(prefix, *pose_via, prefix, *pose_to, a, v, r) + "\n" + + @staticmethod + def movej(q, a=0.1, v=0.1, t=0, r=0, joint_p=False): + """Move to position (linear in joint-space) + + When using this command, the + robot must be at standstill or come from a movej or movel with a blend. + The speed and acceleration parameters controls the trapezoid speed + profile of the move. The $t$ parameters can be used in stead to set the + time for this move. Time setting has priority over speed and acceleration + settings. The blend radius can be set with the $r$ parameters, to avoid + the robot stopping at the point. However, if he blend region of this mover + overlaps with previous or following regions, this move will be skipped, and + an ’Overlapping Blends’ warning message will be generated. + :param q: joint positions (q can also be specified as a pose, the inverse + kinematics is used to calculate the corresponding joint positions) + :param a: joint acceleration of leading axis [rad/sˆ2] + :param v: joint speed of leading axis [rad/s] + :param t: time [S] + :param r: blend radius [m] + :param joint_p: if True, poses are specified as joint positions + :return: string containing the movej script + """ + prefix = "" if joint_p else "p" + return "movej({}[{}, {}, {}, {}, {}, {}], a={}, v={}, t={}, r={})".format(prefix, *q, a, v, t, r) + "\n" + + @staticmethod + def movel(pose, a=0.1, v=0.1, t=0, r=0, joint_p=False): + """Move to position (linear in tool-space) + + :param pose: target pose (pose can also be specified as joint positions, + then forward kinematics is used to calculate the corresponding pose) + :param a: tool acceleration [m/2^s] + :param v: tool speed [m/s] + :param t: time [S] + :param r: blend radius [m] + :param joint_p: if True, poses are specified as joint positions + :return: string containing the movel script + """ + prefix = "" if joint_p else "p" + return "movel({}[{}, {}, {}, {}, {}, {}], a={}, v={}, t={}, r={})".format(prefix, *pose, a, v, t, r) + "\n" + + @staticmethod + def movep(pose, a=0.1, v=0.1, t=0, r=0, joint_p=False): + """Move Process + + Blend circular (in tool-space) and move linear (in tool-space) to position. + Accelerates to and moves with constant tool speed v. + :param pose: target pose (pose can also be specified as joint positions, + then forward kinematics is used to calculate the corresponding pose + :param a: tool acceleration [m/2^s] + :param v: tool speed [m/s] + :param t: time [S] + :param r: blend radius [m] + :param joint_p: if True, poses are specified as joint positions + :return: string containing the movep script + """ + prefix = "" if joint_p else "p" + return "movep({}[{}, {}, {}, {}, {}, {}], a={}, v={}, t={}, r={})" .format(prefix, *pose, a, v, t, r) + "\n" + + @staticmethod + def speedj(qd, a=0.1, t=0): + """Accelerate linearly in joint space and continue with constant joint speed. + + The time t is optional; if provided the function will return after time t, regardless of the target speed has been reached. + If the time t is not provided, the function will return when the target speed is reached. + :param qd: joint speeds [rad/s] + :param a: joint acceleration [rad/sˆ2] + :param t: time [S]. Optional. Will not consider if equals or bellow 0 + :return: string containing the speedj script + """ + speedtime = "" if t <= 0 else ", t=" + str(t) + return "speedj([{}, {}, {}, {}, {}, {}], a={}{})".format(*qd, a, speedtime) + "\n" + + @staticmethod + def set_tcp(pose): + """Set the Tool Center Point + + Sets the transformation from the output flange coordinate system to the TCP as a pose. + :param pose: A pose representing the offset in x, y z and the rotation of the tcp around x, y and z + :return: A string containing the set tcp script + """ + return "set_tcp(p[{}, {}, {}, {}, {}, {}])".format(*pose) + "\n" + + @staticmethod + def stopj(a=1.5): + """Stop (linear in joint space) + + Decelerate joint speeds to zero + :param a: joint acceleration [rad/sˆ2] (of leading axis) + :return: String containing the stopj script + """ + return "stopj({})".format(a) + "\n" + + @staticmethod + def stopl(a=0.5): + """Stop (linear in tool space) + + Decelerate tool speeds to zero + :param a: joint acceleration [rad/sˆ2] (of leading axis) + :return: String containing the stopj script + """ + return "stopl({})".format(a) + "\n" diff --git a/URSentry.py b/URSentry.py new file mode 100644 index 0000000..0158d53 --- /dev/null +++ b/URSentry.py @@ -0,0 +1,175 @@ +from Robot.UR.URRobot import URRobot +from time import sleep +import math +import threading + + +class URSentry: + def __init__(self, host): + self.robot = URRobot(host) + self.sentry_pose = [1.571, -1.571, 0.785, -0.785, -1.571, 1.318] + self.forward_pose = [1.571, -1.949, 1.974, -2.844, -1.571, 1.318] + self.robot_speed = [0, 0, 0, 0, 0, 0] + self.detections = [] + + # Flags + self.await_stop = False + self.await_stop_ticks = 0 + + self.send_to_zero_on_stop = False + + # Smooth stopping + self.smooth_stop_delayed_call = None + + def get_joint_angles(self) -> "list[float]": + """ + Get the current joint angles of the robot in degrees + """ + return self.robot.get_joint_angles_degrees() + + def sentry_position(self, a=0.5, v=1.5): + self.robot.movej(self.sentry_pose, a=0.5, v=1.5) + + def forward_position(self, a=0.5, v=1.5): + self.robot.movej(self.forward_pose, a, v) + + def update_detections(self, detections): + self.detections = detections + + def forward_position_to_base_angle_degrees(self, base_angle, a=0.5, v=1.5): + pose = self.forward_pose + pose[0] = math.radians(base_angle) + self.robot.movej(pose, a, v) + + def set_base_speed(self, speed): + self.robot_speed[0] = speed + + def set_neck_speed(self, speed): + self.robot_speed[4] = speed + + def smooth_stop(self): + # print("Smooth stopping --------------------") + self.robot.speedj([0, 0, 0, 0, 0, 0], 1.5) + + def lerp(self, a, b, t): + return a + t * (b - a) + + def move_robot_base(self, joystick_pos_x: float): + """ + Move the robot based on a joystick input, + where the the center is (0, 0) and the bottom right corner is (1, 1). + + Horizontally, the speed of the base is adjusted on how far left or right the detection is. + We take into account the maximum rotation limit, trying to slow down when nearing +-360 degrees, + and trying to never reach that point. If needed, the robot should attempt to do a full rotation the other way to keep following + the target. + This can be changed so we first change the horizontal "neck" (wrist_2) until it reaches a limit, and then rotate the base, + which would result in a more natural movement + + Vertically, the speed of the vertical "neck" (wrist_1) is adjusted based on how far up or down the detection is. + We limit it's rotation to +- 45 degrees, as we do not need more to follow the target. + This can be changed in the future to acomplish more natural movements, by also adjusting the speed of the shoulder joint. + + Movement is done entirely through speed control, which should result in a more fluid movement compared to pose control. + + If flag await_stop is set True, we check if the current speed is 0 wait until all joints are stopped before moving again. + + """ + # We omit all processing until the robot has stopped moving for a certain amount of calls ('ticks') + + # print("Base Speed: ", self.robot_speed[0]) + + ticks_to_wait = 10 + if self.await_stop: + joint_speeds = self.robot.get_joint_speeds() + if all([speed == 0 for speed in joint_speeds]): + self.await_stop_ticks += 1 + if self.await_stop_ticks > ticks_to_wait: + self.await_stop = False + self.await_stop_ticks = 0 + return + + if self.send_to_zero_on_stop: + self.await_stop = True + self.send_to_zero_on_stop = False + self.forward_position_to_base_angle_degrees(0) + + # Cancel smooth stopping due to no input + if self.smooth_stop_delayed_call is not None: + self.smooth_stop_delayed_call.cancel() + self.smooth_stop_delayed_call = None + + # # Center detections, so that the center of the image is (0, 0) + # # We also invert the y axis, so that the upper part of the image is positive + # # Thus: lower left corner is (-500, -500) and upper right corner is (500, 500) + # joystick_pos_x -= 500 + # joystick_pos_y -= 500 + # joystick_pos_y = -joystick_pos_y + + # Deadzones where we still consider the target in the middle, to avoid jittering + horizontal_dead_zone_radius = 0.1 + vertical_dead_zone_radius = 0.1 + + # Speeds for the base and neck joints + base_max_speed = 0.8 + base_min_speed = 0.3 + + base_acceleration = 1.5 + + # Maximum and minimum angles + base_max_angle = 315 + base_danger_angle = 340 + + if abs(joystick_pos_x) < horizontal_dead_zone_radius: + # We are in the deadzone, so we do not need to move the base, or stop moving it + if self.robot_speed[0] == 0: + return + self.set_base_speed(0) + + else: + current_pose = self.get_joint_angles() + base_angle = current_pose[0] + # print("Base angle: ", base_angle) + # Check if we are past the maximum angle + if abs(base_angle) > base_max_angle: + # We are past the maximum angle, so we undo a rotation by sending to 0 + self.send_to_zero_on_stop = True + self.await_stop = True + self.set_base_speed(0) + self.robot.speedj(self.robot_speed, base_acceleration, 1) + return + + # We are not in the deadzone, so we need to move the base + # Calculate the speed based on the distance from the center + direction = math.copysign(1, joystick_pos_x) + base_speed = ( + self.lerp(base_min_speed, base_max_speed, abs(joystick_pos_x)) + * direction + ) + self.set_base_speed(base_speed) + + # Schedule smooth stop if no input is given for a second + self.smooth_stop_delayed_call = threading.Timer( + 0.4, self.move_robot_base, args=[0] + ) + self.smooth_stop_delayed_call.start() + + # print("Robot speed: ", self.robot_speed) + self.robot.speedj(self.robot_speed, base_acceleration, 1) + + +if __name__ == "__main__": + host = "172.22.114.160" + sentry = URSentry(host) + # sentry.sentry_position() + # sleep(3) + sentry.forward_position() + sleep(3) + sentry.robot.speedj([0.8, 0, 0, 0, 0, 0], 0.5, 3) + sleep(2) + sentry.smooth_stop() + sleep(2) + sentry.forward_position_to_base_angle_degrees(45) + + # while True: + # print(sentry.robot.get_joint_speeds()[0]) diff --git a/UnifiWebsockets/Unifi.py b/UnifiWebsockets/Unifi.py new file mode 100644 index 0000000..d4bda76 --- /dev/null +++ b/UnifiWebsockets/Unifi.py @@ -0,0 +1,75 @@ +import asyncio +import queue +import json +import requests +import websockets +import ssl +import re +import dotenv + +# from UnifiWebsockets.decode import decode_packet + + +# REST login script to get the CSRF token +def get_token(base_url, username, password): + login_url = f"{base_url}/api/auth/login" + payload = {"username": username, "password": password} + headers = {"Content-Type": "application/json"} + try: + session = requests.Session() + response = session.post(login_url, json=payload, headers=headers, verify=False) + response.raise_for_status() # Raise an exception for HTTP errors + return session.cookies + except requests.exceptions.RequestException as e: + print(f"Failed to obtain token: {e}") + return None, None + + +# Function to connect to the websocket and print the event stream +async def listen_to_event_stream(ws_url, cookies, queue): + cookie_header = "".join([f"{key}={value}" for key, value in cookies.items()]) + headers = {"Cookie": cookie_header} + + ssl_context = ssl._create_unverified_context() # Create an unverified SSL context + try: + async with websockets.connect( + ws_url, extra_headers=headers, ssl=ssl_context + ) as websocket: + while True: + try: + message = await websocket.recv() + message = str(message) + + queue.put(find_coords(message)) + # print(find_coords(message)) + + # coords_begin = message.find('coord') + 7 + # coords_end = message[coords_begin:].find(']') + # print(message[coords_begin:]) + + # print(decode_packet(message)) + except websockets.ConnectionClosed: + print("Connection closed") + break + except Exception as e: + print(f"WebSocket connection failed: {e}") + + +def find_coords(message: str) -> list[str]: + coords_begin = [m.start() + 8 for m in re.finditer('"coord":', message)] + coords_end = [m.start() + 1 for m in re.finditer('],"depth":', message)] + coords = [ + json.loads(message[coords_begin[i] : coords_end[i]]) + for i in range(len(coords_begin)) + ] + return coords + + +def run(q: queue.Queue) -> None: + base_url = "https://172.22.114.176" # Replace with your UniFi Protect base URL + username = "engr-ugaif" # Replace with your username + password = dotenv.get_key(dotenv.find_dotenv(), "UNIFI_PASSWORD") + ws_url = "wss://172.22.114.176/proxy/protect/ws/liveDetectTrack?camera=668daa1e019ce603e4002d31" # Replace with your WebSocket URL + + cookies = get_token(base_url, username, password) + asyncio.run(listen_to_event_stream(ws_url, cookies, q)) diff --git a/UnifiWebsockets/decode.py b/UnifiWebsockets/decode.py new file mode 100644 index 0000000..8d5783d --- /dev/null +++ b/UnifiWebsockets/decode.py @@ -0,0 +1,108 @@ +import zlib +import struct +import json +from typing import Tuple, Union + +# Constants +EVENT_PACKET_HEADER_SIZE = 8 + +# Packet Types +HEADER = 1 +PAYLOAD = 2 + +# Payload Types +JSON_TYPE = 1 +STRING_TYPE = 2 +BUFFER_TYPE = 3 + +def decode_packet(packet: bytes) -> Union[Tuple[dict, Union[dict, str, bytes]], None]: + try: + data_offset = struct.unpack('>I', packet[ProtectEventPacketHeader.PAYLOAD_SIZE:ProtectEventPacketHeader.PAYLOAD_SIZE + 4])[0] + EVENT_PACKET_HEADER_SIZE + + if len(packet) != (data_offset + EVENT_PACKET_HEADER_SIZE + struct.unpack('>I', packet[data_offset + ProtectEventPacketHeader.PAYLOAD_SIZE:data_offset + ProtectEventPacketHeader.PAYLOAD_SIZE + 4])[0]): + raise ValueError("Packet length doesn't match header information.") + except Exception as error: + print(f"Error decoding update packet: {error}") + return None + + header_frame = decode_frame(packet[:data_offset], HEADER) + payload_frame = decode_frame(packet[data_offset:], PAYLOAD) + + if not header_frame or not payload_frame: + return None + + return header_frame, payload_frame + +def decode_frame(packet: bytes, packet_type: int) -> Union[dict, str, bytes, None]: + frame_type = packet[ProtectEventPacketHeader.TYPE] + + if packet_type != frame_type: + return None + + payload_format = packet[ProtectEventPacketHeader.PAYLOAD_FORMAT] + is_deflated = packet[ProtectEventPacketHeader.DEFLATED] + + if is_deflated: + payload = zlib.decompress(packet[EVENT_PACKET_HEADER_SIZE:]) + else: + payload = packet[EVENT_PACKET_HEADER_SIZE:] + + if frame_type == HEADER: + if payload_format == JSON_TYPE: + return json.loads(payload.decode('utf-8')) + else: + return None + + if payload_format == JSON_TYPE: + return json.loads(payload.decode('utf-8')) + elif payload_format == STRING_TYPE: + return payload.decode('utf-8') + elif payload_format == BUFFER_TYPE: + return payload + else: + print(f"Unknown payload packet type received in the realtime events API: {payload_format}") + return None + +def decode_packet_from_mac(packet: bytes, mac: str) -> Union[Tuple[dict, Union[dict, str, bytes]], None]: + try: + data_offset = struct.unpack('>I', packet[ProtectEventPacketHeader.PAYLOAD_SIZE:ProtectEventPacketHeader.PAYLOAD_SIZE + 4])[0] + EVENT_PACKET_HEADER_SIZE + + if len(packet) != (data_offset + EVENT_PACKET_HEADER_SIZE + struct.unpack('>I', packet[data_offset + ProtectEventPacketHeader.PAYLOAD_SIZE:data_offset + ProtectEventPacketHeader.PAYLOAD_SIZE + 4])[0]): + raise ValueError("Packet length doesn't match header information.") + except Exception as error: + print(f"Error decoding update packet: {error}") + return None + + header_frame = decode_frame(packet[:data_offset], HEADER) + + if not header_frame: + return None + + # Check if modelKey is 'camera' and mac matches + if header_frame.get('modelKey') != 'camera' or header_frame.get('mac') != mac: + return None + + payload_frame = decode_frame(packet[data_offset:], PAYLOAD) + + if not payload_frame: + return None + + return header_frame, payload_frame + +class ProtectEventPacketHeader: + TYPE = 0 + PAYLOAD_FORMAT = 1 + DEFLATED = 2 + UNKNOWN = 3 + PAYLOAD_SIZE = 4 + +# Example usage: +# packet = b'...' # binary string from websocket +# mac_address = 'XX:XX:XX:XX:XX:XX' # Replace with the desired MAC address +# result = decode_packet_from_mac(packet, mac_address) +# if result: +# action_frame, data_frame = result +# print("Action Frame:", action_frame) +# print("Data Frame:", data_frame) +# else: +# print("No matching packet found.") \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..05aa3ca --- /dev/null +++ b/main.py @@ -0,0 +1,65 @@ +from UnifiWebsockets import Unifi +from URSentry import URSentry +import queue +import time +import threading +import BBoxProcessor + +q = queue.Queue() + +x = threading.Thread(target=Unifi.run, args=(q,)) +x.start() + +b = BBoxProcessor.BBoxProcessor() + +ur = URSentry("172.22.114.160") + +joystick_queue = queue.Queue() + + +class CameraJoystick(threading.Thread): + def __init__(self, joystick_queue: queue.Queue[list[int]]): + threading.Thread.__init__(self) + self.joystick_queue = joystick_queue + self.keep_running = True + + def run(self): + while self.keep_running: + bb = b.get_joystick_position_from_new_set_of_bboxes(q.get()) + print(bb) + self.joystick_queue.put(bb) + + def stop(self): + self.keep_running = False + + +current_joystick = [0, 0] + + +class JoystickScheduler(threading.Thread): + def __init__(self, joystick_queue: queue.Queue[list[int]]): + threading.Thread.__init__(self) + self.joystick_queue = joystick_queue + self.keep_running = True + + def run(self): + while self.keep_running: + global current_joystick + current_joystick = self.joystick_queue.get() + + def stop(self): + self.keep_running = False + + +camera_joystick = CameraJoystick(joystick_queue) +camera_joystick.start() +joystick_scheduler = JoystickScheduler(joystick_queue) +joystick_scheduler.start() + +ur.await_stop = True +ur.forward_position() + +while True: + ur.move_robot_base(-current_joystick[0]) + print("Setting robot base velocity to: ", current_joystick[0]) + time.sleep(0.2)