robot looks at you

This commit is contained in:
Hunter 2024-07-19 14:35:10 -04:00
parent 19bd8a2f6f
commit 482d05f252
12 changed files with 1311 additions and 0 deletions

1
.env.example Normal file
View File

@ -0,0 +1 @@
export UNIFI_PASSWORD=""

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
.env
__pycache__/

85
BBoxProcessor.py Normal file
View File

@ -0,0 +1,85 @@
import time
import math
class BBoxProcessor:
def __init__(self):
self.last_boxes = []
self.last_timestamps = []
def get_joystick_position_from_new_set_of_bboxes(self, new_boxes: list) -> list:
"""
Given a set of bounding boxes, returns the joystick position that would move the robot to the closest one.
"""
new_boxes = self.process_next(new_boxes)
closest_box = self.get_box_closest_to_center(new_boxes)
if closest_box is None:
return [0, 0]
return self.get_normalized_box_position(closest_box)
def bbox_distance(self, box1: list, box2: list) -> float:
"""
Calculates the distance between two bounding boxes.
"""
return math.sqrt(
((box1[0] + box1[2] / 2) - (box2[0] + box2[2] / 2)) ** 2
+ ((box1[1] + box1[3] / 2) - (box2[1] + box2[3] / 2)) ** 2
)
def process_next(self, new_boxes, dist_threshold=80) -> list:
"""
Receives a reading of bounding boxes and processes them.
It compares previous bounding boxes with the new ones: If a previous box is close enough to an new one, it is considered the same object.
It also checks if previous boxes have outlived their last appearance, and if so, they are not added to the new list.
"""
timestamp = time.time()
time_to_live = 0.5
new_boxes_list = []
new_timestamps_list = []
for new_box in new_boxes:
new_boxes_list.append(new_box)
new_timestamps_list.append(timestamp)
min_distance = dist_threshold
for index, box in enumerate(self.last_boxes):
distance = 9999
closest = None
for new_box in new_boxes:
new_distance = self.bbox_distance(box, new_box)
if new_distance < min_distance and new_distance < distance:
distance = new_distance
closest = new_box
if (
closest is None
and timestamp < self.last_timestamps[index] + time_to_live
):
new_boxes_list.append(box)
new_timestamps_list.append(timestamp)
self.last_boxes = new_boxes_list
self.last_timestamps = new_timestamps_list
return new_boxes_list
def get_box_closest_to_center(self, box_list: list) -> list:
"""
Returns the BBox closest to the center (500, 500)
"""
closest = []
closest_distance = 9999
for box in box_list:
distance = math.sqrt(
(box[0] + box[2] / 2 - 500) ** 2 + (box[1] + box[3] / 2 - 500) ** 2
)
if distance < closest_distance:
closest = box
closest_distance = distance
return closest
def get_normalized_box_position(self, box: list) -> list:
"""
Returns the normalized position of a bounding box, shringking its range from 0 to 1000 to 0 to 1
"""
return [(box[0] + box[2] / 2) / 500.0 - 1, (box[1] + box[3] / 2) / 500 - 1]

266
Communication/ModbusTCP.py Normal file
View File

@ -0,0 +1,266 @@
import struct
import random
from Communication.SocketConnection import SocketConnection
# All MODBUS/TCP ADU are sent via TCP to registered port 502.
# Remark : the different fields are encoded in Big-endian
# A Modbus frame is composed of an Application Data Unit (ADU), which encloses a Protocol Data Unit (PDU).
# A dedicated header is used on TCP/IP to identify the MODBUS Application Data Unit.
# It is called the MBAP header (MODBUS Application Protocol header)
#
# ADU = MBAP Header + PDU,
# PDU = Function code + Data.
# Below the components of the Modbus TCP/IP are listed together with their size in bytes:
# +---------------------------------+
# | Application Data Unit (ADU) |
# +---------------+-----------------+
# | **Component** | **Size** (bytes)|
# +---------------+-----------------+
# | MBAP Header | 7 |
# +---------------+-----------------+
# | PDU | N |
# +---------------+-----------------+
# The MBAP header is 7 bytes long and contains the following fields:
# +------------------------+--------------------+--------------------------------------+
# | Modbus Application Protocol header (MBAP) |
# +------------------------+--------------------+--------------------------------------+
# | **Field** | **Length** (bytes) | **Description** |
# +------------------------+--------------------+--------------------------------------+
# | Transaction identifier | 2 | Identification of a |
# | | | Modbus request/response transaction. |
# +------------------------+--------------------+--------------------------------------+
# | Protocol identifier | 2 | Protocol ID, 0 = Modbus protocol |
# +------------------------+--------------------+--------------------------------------+
# | Length | 2 | Number of following bytes |
# +------------------------+--------------------+--------------------------------------+
# | Unit identifier | 1 | Identification of a remote slave |
# | | | connected on a serial line or bus |
# +------------------------+--------------------+--------------------------------------+
# The Protocol Data Unit has a variable length and consists of the following fields:
# +------------------------+--------------------+--------------------------------------+
# | Protocol Data Unit (PDU) |
# +------------------------+--------------------+--------------------------------------+
# | **Field** | **Length** (bytes) | **Description** |
# +------------------------+--------------------+--------------------------------------+
# | Function code | 1 | Function codes as in other variants |
# +------------------------+--------------------+--------------------------------------+
# | Data | n | Data as response or commands |
# +------------------------+--------------------+--------------------------------------+
# Response:
# For a normal response, slave repeats the function code. Should a slave want to report an error,
# it will reply with the requested function code plus 128 (hex 0x80) (3 becomes 131 = hex 0x83),
# and will only include one byte of data, known as the exception code.
# A normal response frame would look like this:
# +----------------------------------+
# | Response (ADU) |
# +----------------+-----------------+
# | **Component** | **Size** (bytes)|
# +----------------+-----------------+
# | MBAP Header | 7 |
# +----------------+-----------------+
# | Function code | 1 |
# +----------------+-----------------+
# | Data | n |
# +----------------+-----------------+
# An error response frame is 9 bytes long with the following fields
# +----------------------------------+
# | Response (ADU) (Error) |
# +----------------+-----------------+
# | **Component** | **Size** (bytes)|
# +----------------+-----------------+
# | MBAP Header | 7 |
# +----------------+-----------------+
# | Function code | 1 |
# +----------------+-----------------+
# | Exception code | 1 |
# +----------------+-----------------+
# For more information on modbus:
# http://www.modbus.org
# http://www.modbus.org/docs/Modbus_Application_Protocol_V1_1b.pdf
# http://www.modbus.org/docs/Modbus_Messaging_Implementation_Guide_V1_0b.pdf
class ModbusTCP:
"""
A Modbus communication class designed for use with modbusTCP
"""
__version__ = '0.1'
# Modbus function code
READ_COILS = 0x01
READ_HOLDING_REGISTERS = 0x03
# Todo: Implement the following modbus functionality:
# READ_DISCRETE_INPUTS = 0x02
# READ_INPUT_REGISTERS = 0x04
# WRITE_SINGLE_COIL = 0x05
# WRITE_SINGLE_REGISTER = 0x06
# WRITE_MULTIPLE_COILS = 0x0F
# WRITE_MULTIPLE_REGISTERS = 0x10
# Todo: Implement error checking with the following exception codes
# Modbus exception code
# ILLEGAL_FUNCTION_CODE = 0x01
# ILLEGAL_DATA_ACCESS = 0x02 # if the request address is illegal
# ILLEGAL_DATA_VALUE = 0x03 # if the request data is invalid
def __init__(self, host, port=502):
"""
:param host: IP address to connect with
:param port: Pot (standard 502) to connect with
"""
self.__transaction_id = 0 # For synchronization between messages of server and client
self.__protocol_id = 0 # 0 for Modbus/TCP
self.__unit_id = 0 # Slave address (255 if not used)
self.pretty_print_response = False # Check to print out response message in console
self.connection = SocketConnection(host, port)
def open(self):
"""
Open the socket for communication
"""
self.connection.connect()
def close(self):
"""
Close the socket
"""
self.connection.disconnect()
def read_coils(self, bit_address, quantity=1):
""" Main function 1 of Modbus/TCP - 0x01
:param bit_address:
:param quantity:
:return:
"""
data_bytes = struct.pack(">HH", bit_address, quantity)
message = self._create_message(self.READ_COILS, data_bytes)
return self._send(message)
def read_holding_registers(self, reg_address, quantity=1):
"""Main function 3 of Modbus/TCP - 0x03.
Reads the values stored in the registers at the specified addresses.
:param reg_address: Address of first register to read (16-bit) specified in bytes.
:param quantity: Number of registers to read (16-bit) specified in bytes
:return: The values stored in the addresses specified in Bytes
"""
data_bytes = struct.pack(">HH", reg_address, quantity)
message = self._create_message(self.READ_HOLDING_REGISTERS, data_bytes)
return self._send(message)
def _create_message(self, function_code, data_bytes):
"""
Create packet in bytes format for sending.
:param function_code: bytes
:param data_bytes: bytes
:return: Bytes modbus packet
"""
body = struct.pack('>B', function_code) + data_bytes # create PDU
self.__transaction_id = random.randint(0, 65535)
message_length = 1 + len(body)
header = struct.pack(">HHHB", self.__transaction_id, self.__protocol_id, message_length, self.__unit_id)
return header + body
def _send(self, adu):
""" Send message over the socket
:param adu: The data to send over the socket
:return: Bytes response from the other end of the socket
"""
self.open()
self.connection.send(adu)
response = self.connection.receive()
self.close()
if self.pretty_print_response:
self.pretty_print(response)
if self._error_check(response):
return None
return response
def _error_check(self, response):
""" Check if the frame is void of errors
Raises an exception termination the program
:param response: The ADU to check
:return: None
"""
mbap = response[:7]
function_code = response[7:8]
mbap = struct.unpack(">HHHB", mbap)
if mbap[0] != self.__transaction_id:
print("Modbus: Transaction ID mismatch"
"\n - Send: {} \n - Response: {}".format(self.__transaction_id, mbap[0]))
return True
elif mbap[1] != self.__protocol_id:
print("Modbus: Protocol ID mismatch"
"\n - Send: {} \n - Response: {}".format(self.__protocol_id, mbap[1]))
return True
elif mbap[3] != self.__unit_id:
print("Modbus: Unit ID mismatch"
"\n - Send: {} \n - Response: {}".format(self.__unit_id, mbap[3]))
return True
elif mbap[2] != len(response[6:]):
print("Modbus: Length mismatch"
"\n - Length: {} \n - Remaining: {}".format(mbap[2], len(response[6:])))
return True
function_code = struct.unpack(">B", function_code)
if function_code[0] > 127:
error_code = struct.unpack(">B", response[8:9])
print("Modbus: Function error: {}".format(error_code))
return True
return False
def set_pretty_print(self, value):
"""
Enable or disable printing of response message in console
:param value: Boolean
"""
self.pretty_print_response = value
@staticmethod
def pretty_print(response):
""" Print Response in the console
Unpacks the MBAP and function code in readable format
Data bytes
:param response:
"""
mbap = response[:7]
function_code = response[7:8]
mbap = struct.unpack(">HHHB", mbap)
function_code = struct.unpack(">B", function_code)
print("+--------------------------------------+")
print("| ****Modbus TCP Response (ADU)**** |")
print("+--------------------------------------+")
print("| **Header information (MBAP)** |")
print("+--------------------------------------+")
print("| Transaction id: " + str(mbap[0]))
print("| Protocol id: " + str(mbap[1]))
print("| Length: " + str(mbap[2]))
print("| Unit id: " + str(mbap[3]))
print("+--------------------------------------+")
print("| **Data information (PDU)** |")
print("+--------------------------------------+")
print("| Function code: " + str(function_code[0]))
print("| Data: " + str(response[8:]))
print("+--------------------------------------+")
print("\n")

View File

@ -0,0 +1,69 @@
import socket
class SocketConnection:
"""
Defines a simple interface for connecting to a socket
"""
def __init__(self, host, port):
"""
:param host: The IP to connect with
:param port: Port to connect with
"""
self.host = host
self.port = port
self.opened = False
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def connect(self):
"""
Opens a socket connection with the robot for communication.
:return:
"""
if self.opened:
self.disconnect()
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.settimeout(1)
try:
self.s.connect((self.host, self.port))
self.opened = True
except OSError as error:
print("Connecting OS error: {0}".format(error))
return
return self.s
def send(self, message):
"""
Send data over the socket connection
:param message: The data to send
:return:
"""
total_send = 0
while total_send < len(message):
send = self.s.send(message[total_send:])
if send == 0:
raise RuntimeError("socket connection broken")
total_send = total_send + send
self.s.send(message)
def receive(self):
"""
Recieve data over the socket connection
:return:
"""
response = self.s.recv(1024)
if len(response) == 0:
raise RuntimeError("socket connection broken")
return response
def disconnect(self):
"""
Closes the socket connection
:return:
"""
try:
self.s.close()
except OSError as error:
print("Disconnecting OS error: {0}".format(error))
return

163
Robot/UR/URModbusServer.py Normal file
View File

@ -0,0 +1,163 @@
from Communication.ModbusTCP import ModbusTCP
import time, math, struct
# The robot controller acts as a Modbus TCP server (port 502),
# clients can establish connections to it and send standard MODBUS requests to it.
# - Note that some Modbus device manufacturers use the terms Master (client) and Slave (server).
# Typically the external IO device is going to be a server and the robot is going to behave as the client
# (requesting and consuming messages from the server). Master=client and Slave=server.
# However, note that the UR controller can be both a server and a client
# - Note that all values are unsigned, if you want to convert to signed integers,
# program "if (val > 32768): val = val - 65535".
#
# - The MODBUS Server has 0-based addressing.
# Be aware that some other devices are 1-based (e.g. Anybus X-gateways), then just add one to the address
# on that device. (e.g. address 3 on the robot will be address 4 on the Anybus X-gateway)
class URModbusServer:
"""Give read and write access to data in the robot controller for other devices
An interface for communicating with the modbus TCP server (port 502) on the UR.
Defines functions for retrieving information from the controller.
Information will be re-requested if an error occurs.
All information will be formatted to human readable information.
"""
def __init__(self, host):
"""
:param host: IP address to connect with
"""
self.modbusTCP = ModbusTCP(host, 502)
def get_tcp_position(self):
"""
Connects with the Modbus server to requests Cartesian data of the TCP
:return: Readable cartesian data of TCP, vector in mm, axis in radials
"""
packet = self.modbusTCP.read_holding_registers(400, quantity=6)
if packet is None:
time.sleep(0.5)
print("Modbus Error: retrying")
return self.get_tcp_position()
else:
x = self._format(packet[9:11]) / 10
y = self._format(packet[11:13]) / 10
z = self._format(packet[13:15]) / 10
rx = self._format(packet[15:17]) / 1000
ry = self._format(packet[17:19]) / 1000
rz = self._format(packet[19:21]) / 1000
return x, y, z, rx, ry, rz
def get_joint_angles(self, tries=0) -> tuple:
"""
Connects with the Modbus server to requests the angles of each joint, in radians
:return: Readable angle values of each joint in radials
"""
if tries>10:
print("Modbus Error: Failed")
return 0, 0, 0, 0, 0, 0
packet = self.modbusTCP.read_holding_registers(270, quantity=6)
packet_signs = self.modbusTCP.read_holding_registers(320, quantity=6)
if (packet is None) or (packet_signs is None):
time.sleep(0.01)
print(f"Modbus Error #{tries}: retrying")
return self.get_joint_angles(tries+1)
else:
#print(f"Packet: {packet_signs.hex()}")
base = self._format_sign(packet[9:11], packet_signs[9:11])
shoulder = self._format_sign(packet[11:13], packet_signs[11:13])
elbow = self._format_sign(packet[13:15], packet_signs[13:15])
wrist_1 = self._format_sign(packet[15:17], packet_signs[15:17])
wrist_2 = self._format_sign(packet[17:19], packet_signs[17:19])
wrist_3 = self._format_sign(packet[19:21], packet_signs[19:21])
return base, shoulder, elbow, wrist_1, wrist_2, wrist_3
def get_joint_angles_degrees(self, tries=0):
angles = self.get_joint_angles(tries)
return tuple([round(math.degrees(angle), 3) for angle in angles])
def get_joint_speeds(self, tries=0):
"""
Connects with the Modbus server to requests the speed of each joint, in radians
:return: Readable angle values of each joint in radials
"""
if tries>10:
print("Modbus Error: Failed")
return 0, 0, 0, 0, 0, 0
packet = self.modbusTCP.read_holding_registers(280, quantity=6)
if (packet is None):
time.sleep(0.01)
print(f"Modbus Error #{tries}: retrying")
return self.get_joint_speeds(tries+1)
else:
#print(f"Packet: {packet_signs.hex()}")
base = self._format(packet[9:11]) / 1000
shoulder = self._format(packet[11:13]) / 1000
elbow = self._format(packet[13:15]) / 1000
wrist_1 = self._format(packet[15:17]) / 1000
wrist_2 = self._format(packet[17:19]) / 1000
wrist_3 = self._format(packet[19:21]) / 1000
return base, shoulder, elbow, wrist_1, wrist_2, wrist_3
@staticmethod
def _format(d):
"""Formats signed integers to unsigned float
:param d: signed integer to format
:return: unsigned float
"""
d = d.hex()
d_i = int(d, 16)
d_f = 0
if d_i < 32768:
d_f = float(d_i)
if d_i > 32767:
d_i = 65535 - d_i
d_f = float(d_i) * -1
return d_f
@staticmethod
def _format_2(d):
"""Formats signed integers to signed float
:param d: signed integer to format
:return: signed float
"""
d = d.hex()
d_i = int(d, 16)
d_f = float(d_i)
return d_f
@staticmethod
def _format_sign(d, sign):
"""
:param d:
:param sign:
:return:
"""
sign = sign.hex()
sign = int(sign, 16)
d = d.hex()
d_i = int(d, 16)
d_f = float(d_i) / 1000
if sign == 0:
return d_f
else:
d_f = round(d_f - math.radians(360), 3)
return d_f

160
Robot/UR/URRobot.py Normal file
View File

@ -0,0 +1,160 @@
from Communication.SocketConnection import SocketConnection
from Robot.UR.URModbusServer import URModbusServer
from Robot.UR.URScript import URScript
class URRobot:
"""
Interface for communicating with the UR Robot
SecondaryPort used for sending commands
ModbusServer used for retrieving info
"""
def __init__(self, host):
self.secondaryPort = 30002
self.secondaryInterface = SocketConnection(host, self.secondaryPort)
self.secondaryInterface.connect()
self.URModbusServer = URModbusServer(host)
self.URScript = URScript()
# Max safe values of acceleration and velocity are 0.4
# DO NOT USE THE FOLLOWING VALUES
# MAX a=1.3962634015954636
# MAX v=1.0471975511965976
# These are the absolute max for 'safe' movement and should only be used
# if you know what your are doing, otherwise you WILL damage the UR internally
self.acceleration = 0.1
self.velocity = 0.1
def movel(self, pose, a=0.1, v=0.1, joint_p=False):
"""Move to position (linear in tool-space)
See :class:`URScript` for detailed information
"""
script = URScript.movel(pose, a, v, joint_p=joint_p).encode()
return self._send_script(script)
def movej(self, q, a=0.1, v=0.1, joint_p=True):
"""Move to position (linear in joint-space)
See :class:`URScript` for detailed information
"""
script = URScript.movej(q, a, v, joint_p=joint_p).encode()
return self._send_script(script)
def speedj(self, qd, a=0.5, t=0):
"""Accelerate linearly in joint space and continue with constant joint speed.
The time t is optional; if provided the function will return after time t, regardless of the target speed has been reached.
If thetime t is not provided, the function will return when the target speed is reached.
See :class:`URScript` for detailed information
"""
script = URScript.speedj(qd, a, t).encode()
return self._send_script(script)
def stopj(self, a=1.5):
"""Stop (linear in joint space)
See :class:`URScript` for detailed information
"""
script = URScript.stopj(a).encode()
return self._send_script(script)
def set_tcp(self, pose):
"""Set the Tool Center Point
See :class:`URScript` for detailed information
"""
script = URScript.set_tcp(pose).encode()
return self._send_script(script)
def get_tcp_position(self):
""" Get TCP position
Will return values as seen on the teaching pendant (300.0mm)
:return: 6 Floats - Position data of TCP (x, y, z) in mm (Rx, Ry, Rz) in radials
"""
position_data = self.URModbusServer.get_tcp_position()
return position_data
def get_joint_angles(self):
""" Get joint angles
:return: 6 Floats - Position joint angles (base, shoulder, elbow, wrist_1, wrist_2, wrist_3) in radians
"""
position_data = self.URModbusServer.get_joint_angles()
return position_data
def get_joint_angles_degrees(self):
""" Get joint angles in degrees
:return: 6 Floats - Position joint angles (base, shoulder, elbow, wrist_1, wrist_2, wrist_3) in degrees
"""
position_data = self.URModbusServer.get_joint_angles_degrees()
return position_data
def get_joint_speeds(self):
""" Get joint speeds in rads/s
:return: 6 Floats - Position joint speeds (base, shoulder, elbow, wrist_1, wrist_2, wrist_3) in rads/s
"""
speed_data = self.URModbusServer.get_joint_speeds()
return speed_data
def set_io(self, io, value):
"""
Set the specified IO
:param io: The IO to set as INT
:param value: Boolean to enable or disable IO
:return: Boolean to check if the command has been send
"""
script = "set_digital_out({}, {})".format(io, value) + "\n"
script = script.encode()
return self._send_script(script)
def translate(self, vector, a=0.1, v=0.1):
""" Move TCP based on its current position
Example:
If the TCP position was: 0.4, 0.5, 0.5 and the vector value passed is 0.1, 0.0, 0.0.
It will attempt to translate the TCP to 0.5, 0.5, 0.5
:param vector: the X, Y, Z to translate to
:param a: tool acceleration [m/2^s]
:param v: tool speed [m/s]
:return: Boolean to check if the command has been send
"""
tcp_pos = list(self.get_tcp_position())
tcp_pos[0] = tcp_pos[0] / 1000 + vector[0]
tcp_pos[1] = tcp_pos[1] / 1000 + vector[1]
tcp_pos[2] = tcp_pos[2] / 1000 + vector[2]
return self.movel(tcp_pos, a, v)
def _send_script(self, _script):
""" Send URScript to the UR controller
:param _script: formatted script to send
:return: Boolean to check if the script has been send
"""
try:
self.secondaryInterface.send(_script)
except OSError as error:
print("OS error: {0}".format(error))
return False
return True
@ staticmethod
def format_cartesian_data(cartesian_data):
"""
Formats the vector from human readable mm to robot readable mm
A position of 300mm becomes 0.3
:param cartesian_data: data to format
:return: formatted data
"""
i = 0
for position in cartesian_data:
cartesian_data[position] /= (1000 if i < 3 else 1)
i += 1
formatted_cartesian_data = cartesian_data
return formatted_cartesian_data

142
Robot/UR/URScript.py Normal file
View File

@ -0,0 +1,142 @@
# SW Version 1.8 - URScript manual:
# When connected URScript programs or commands are sent in clear text on the socket.
# Each line is terminated by \n
# python 3 must use bytes (b"") to send the string
# When using URScript the robot moves according to the base plane.
# Joint positions (q) and joint speeds (qd) are represented directly as lists of 6 Floats, one for each robot joint.
# Tool poses (x) are represented as poses also consisting of 6 Floats.
# In a pose, the first 3 coordinates is a position vector and the last 3 an axis-angle
class URScript:
""" Defines functions for use with the URRobot.
Based on URScript manual version 1.8
"""
__version__ = 1.8
@staticmethod
def movec(pose_via, pose_to, a=0.1, v=0.1, r=0, joint_p=False):
"""Move Circular: Move to position (circular in tool-space)
TCP moves on the circular arc segment from current pose, through pose_via to pose_to.
Accelerates to and moves with constant tool speed v
:param pose_via: path point (note: only position is used).
(pose_via can also be specified as joint positions, then forward kinematics is used to
calculate the corresponding pose)
:param pose_to: target pose (pose_to can also be specified as joint positions, then
forward kinematics is used to calculate the corresponding pose
:param a: tool acceleration [m/2^s]
:param v: tool speed [m/s]
:param r: blend radius (of target pose) [m]
:param joint_p: if True, poses are specified as joint positions
:return: string containing the movec script
"""
prefix = "" if joint_p else "p"
return"movec({}[{}, {}, {}, {}, {}, {}], {}[{}, {}, {}, {}, {}, {}], " \
"a={}, v={}, r={})".format(prefix, *pose_via, prefix, *pose_to, a, v, r) + "\n"
@staticmethod
def movej(q, a=0.1, v=0.1, t=0, r=0, joint_p=False):
"""Move to position (linear in joint-space)
When using this command, the
robot must be at standstill or come from a movej or movel with a blend.
The speed and acceleration parameters controls the trapezoid speed
profile of the move. The $t$ parameters can be used in stead to set the
time for this move. Time setting has priority over speed and acceleration
settings. The blend radius can be set with the $r$ parameters, to avoid
the robot stopping at the point. However, if he blend region of this mover
overlaps with previous or following regions, this move will be skipped, and
an Overlapping Blends warning message will be generated.
:param q: joint positions (q can also be specified as a pose, the inverse
kinematics is used to calculate the corresponding joint positions)
:param a: joint acceleration of leading axis [rad/sˆ2]
:param v: joint speed of leading axis [rad/s]
:param t: time [S]
:param r: blend radius [m]
:param joint_p: if True, poses are specified as joint positions
:return: string containing the movej script
"""
prefix = "" if joint_p else "p"
return "movej({}[{}, {}, {}, {}, {}, {}], a={}, v={}, t={}, r={})".format(prefix, *q, a, v, t, r) + "\n"
@staticmethod
def movel(pose, a=0.1, v=0.1, t=0, r=0, joint_p=False):
"""Move to position (linear in tool-space)
:param pose: target pose (pose can also be specified as joint positions,
then forward kinematics is used to calculate the corresponding pose)
:param a: tool acceleration [m/2^s]
:param v: tool speed [m/s]
:param t: time [S]
:param r: blend radius [m]
:param joint_p: if True, poses are specified as joint positions
:return: string containing the movel script
"""
prefix = "" if joint_p else "p"
return "movel({}[{}, {}, {}, {}, {}, {}], a={}, v={}, t={}, r={})".format(prefix, *pose, a, v, t, r) + "\n"
@staticmethod
def movep(pose, a=0.1, v=0.1, t=0, r=0, joint_p=False):
"""Move Process
Blend circular (in tool-space) and move linear (in tool-space) to position.
Accelerates to and moves with constant tool speed v.
:param pose: target pose (pose can also be specified as joint positions,
then forward kinematics is used to calculate the corresponding pose
:param a: tool acceleration [m/2^s]
:param v: tool speed [m/s]
:param t: time [S]
:param r: blend radius [m]
:param joint_p: if True, poses are specified as joint positions
:return: string containing the movep script
"""
prefix = "" if joint_p else "p"
return "movep({}[{}, {}, {}, {}, {}, {}], a={}, v={}, t={}, r={})" .format(prefix, *pose, a, v, t, r) + "\n"
@staticmethod
def speedj(qd, a=0.1, t=0):
"""Accelerate linearly in joint space and continue with constant joint speed.
The time t is optional; if provided the function will return after time t, regardless of the target speed has been reached.
If the time t is not provided, the function will return when the target speed is reached.
:param qd: joint speeds [rad/s]
:param a: joint acceleration [rad/sˆ2]
:param t: time [S]. Optional. Will not consider if equals or bellow 0
:return: string containing the speedj script
"""
speedtime = "" if t <= 0 else ", t=" + str(t)
return "speedj([{}, {}, {}, {}, {}, {}], a={}{})".format(*qd, a, speedtime) + "\n"
@staticmethod
def set_tcp(pose):
"""Set the Tool Center Point
Sets the transformation from the output flange coordinate system to the TCP as a pose.
:param pose: A pose representing the offset in x, y z and the rotation of the tcp around x, y and z
:return: A string containing the set tcp script
"""
return "set_tcp(p[{}, {}, {}, {}, {}, {}])".format(*pose) + "\n"
@staticmethod
def stopj(a=1.5):
"""Stop (linear in joint space)
Decelerate joint speeds to zero
:param a: joint acceleration [rad/sˆ2] (of leading axis)
:return: String containing the stopj script
"""
return "stopj({})".format(a) + "\n"
@staticmethod
def stopl(a=0.5):
"""Stop (linear in tool space)
Decelerate tool speeds to zero
:param a: joint acceleration [rad/sˆ2] (of leading axis)
:return: String containing the stopj script
"""
return "stopl({})".format(a) + "\n"

175
URSentry.py Normal file
View File

@ -0,0 +1,175 @@
from Robot.UR.URRobot import URRobot
from time import sleep
import math
import threading
class URSentry:
def __init__(self, host):
self.robot = URRobot(host)
self.sentry_pose = [1.571, -1.571, 0.785, -0.785, -1.571, 1.318]
self.forward_pose = [1.571, -1.949, 1.974, -2.844, -1.571, 1.318]
self.robot_speed = [0, 0, 0, 0, 0, 0]
self.detections = []
# Flags
self.await_stop = False
self.await_stop_ticks = 0
self.send_to_zero_on_stop = False
# Smooth stopping
self.smooth_stop_delayed_call = None
def get_joint_angles(self) -> "list[float]":
"""
Get the current joint angles of the robot in degrees
"""
return self.robot.get_joint_angles_degrees()
def sentry_position(self, a=0.5, v=1.5):
self.robot.movej(self.sentry_pose, a=0.5, v=1.5)
def forward_position(self, a=0.5, v=1.5):
self.robot.movej(self.forward_pose, a, v)
def update_detections(self, detections):
self.detections = detections
def forward_position_to_base_angle_degrees(self, base_angle, a=0.5, v=1.5):
pose = self.forward_pose
pose[0] = math.radians(base_angle)
self.robot.movej(pose, a, v)
def set_base_speed(self, speed):
self.robot_speed[0] = speed
def set_neck_speed(self, speed):
self.robot_speed[4] = speed
def smooth_stop(self):
# print("Smooth stopping --------------------")
self.robot.speedj([0, 0, 0, 0, 0, 0], 1.5)
def lerp(self, a, b, t):
return a + t * (b - a)
def move_robot_base(self, joystick_pos_x: float):
"""
Move the robot based on a joystick input,
where the the center is (0, 0) and the bottom right corner is (1, 1).
Horizontally, the speed of the base is adjusted on how far left or right the detection is.
We take into account the maximum rotation limit, trying to slow down when nearing +-360 degrees,
and trying to never reach that point. If needed, the robot should attempt to do a full rotation the other way to keep following
the target.
This can be changed so we first change the horizontal "neck" (wrist_2) until it reaches a limit, and then rotate the base,
which would result in a more natural movement
Vertically, the speed of the vertical "neck" (wrist_1) is adjusted based on how far up or down the detection is.
We limit it's rotation to +- 45 degrees, as we do not need more to follow the target.
This can be changed in the future to acomplish more natural movements, by also adjusting the speed of the shoulder joint.
Movement is done entirely through speed control, which should result in a more fluid movement compared to pose control.
If flag await_stop is set True, we check if the current speed is 0 wait until all joints are stopped before moving again.
"""
# We omit all processing until the robot has stopped moving for a certain amount of calls ('ticks')
# print("Base Speed: ", self.robot_speed[0])
ticks_to_wait = 10
if self.await_stop:
joint_speeds = self.robot.get_joint_speeds()
if all([speed == 0 for speed in joint_speeds]):
self.await_stop_ticks += 1
if self.await_stop_ticks > ticks_to_wait:
self.await_stop = False
self.await_stop_ticks = 0
return
if self.send_to_zero_on_stop:
self.await_stop = True
self.send_to_zero_on_stop = False
self.forward_position_to_base_angle_degrees(0)
# Cancel smooth stopping due to no input
if self.smooth_stop_delayed_call is not None:
self.smooth_stop_delayed_call.cancel()
self.smooth_stop_delayed_call = None
# # Center detections, so that the center of the image is (0, 0)
# # We also invert the y axis, so that the upper part of the image is positive
# # Thus: lower left corner is (-500, -500) and upper right corner is (500, 500)
# joystick_pos_x -= 500
# joystick_pos_y -= 500
# joystick_pos_y = -joystick_pos_y
# Deadzones where we still consider the target in the middle, to avoid jittering
horizontal_dead_zone_radius = 0.1
vertical_dead_zone_radius = 0.1
# Speeds for the base and neck joints
base_max_speed = 0.8
base_min_speed = 0.3
base_acceleration = 1.5
# Maximum and minimum angles
base_max_angle = 315
base_danger_angle = 340
if abs(joystick_pos_x) < horizontal_dead_zone_radius:
# We are in the deadzone, so we do not need to move the base, or stop moving it
if self.robot_speed[0] == 0:
return
self.set_base_speed(0)
else:
current_pose = self.get_joint_angles()
base_angle = current_pose[0]
# print("Base angle: ", base_angle)
# Check if we are past the maximum angle
if abs(base_angle) > base_max_angle:
# We are past the maximum angle, so we undo a rotation by sending to 0
self.send_to_zero_on_stop = True
self.await_stop = True
self.set_base_speed(0)
self.robot.speedj(self.robot_speed, base_acceleration, 1)
return
# We are not in the deadzone, so we need to move the base
# Calculate the speed based on the distance from the center
direction = math.copysign(1, joystick_pos_x)
base_speed = (
self.lerp(base_min_speed, base_max_speed, abs(joystick_pos_x))
* direction
)
self.set_base_speed(base_speed)
# Schedule smooth stop if no input is given for a second
self.smooth_stop_delayed_call = threading.Timer(
0.4, self.move_robot_base, args=[0]
)
self.smooth_stop_delayed_call.start()
# print("Robot speed: ", self.robot_speed)
self.robot.speedj(self.robot_speed, base_acceleration, 1)
if __name__ == "__main__":
host = "172.22.114.160"
sentry = URSentry(host)
# sentry.sentry_position()
# sleep(3)
sentry.forward_position()
sleep(3)
sentry.robot.speedj([0.8, 0, 0, 0, 0, 0], 0.5, 3)
sleep(2)
sentry.smooth_stop()
sleep(2)
sentry.forward_position_to_base_angle_degrees(45)
# while True:
# print(sentry.robot.get_joint_speeds()[0])

75
UnifiWebsockets/Unifi.py Normal file
View File

@ -0,0 +1,75 @@
import asyncio
import queue
import json
import requests
import websockets
import ssl
import re
import dotenv
# from UnifiWebsockets.decode import decode_packet
# REST login script to get the CSRF token
def get_token(base_url, username, password):
login_url = f"{base_url}/api/auth/login"
payload = {"username": username, "password": password}
headers = {"Content-Type": "application/json"}
try:
session = requests.Session()
response = session.post(login_url, json=payload, headers=headers, verify=False)
response.raise_for_status() # Raise an exception for HTTP errors
return session.cookies
except requests.exceptions.RequestException as e:
print(f"Failed to obtain token: {e}")
return None, None
# Function to connect to the websocket and print the event stream
async def listen_to_event_stream(ws_url, cookies, queue):
cookie_header = "".join([f"{key}={value}" for key, value in cookies.items()])
headers = {"Cookie": cookie_header}
ssl_context = ssl._create_unverified_context() # Create an unverified SSL context
try:
async with websockets.connect(
ws_url, extra_headers=headers, ssl=ssl_context
) as websocket:
while True:
try:
message = await websocket.recv()
message = str(message)
queue.put(find_coords(message))
# print(find_coords(message))
# coords_begin = message.find('coord') + 7
# coords_end = message[coords_begin:].find(']')
# print(message[coords_begin:])
# print(decode_packet(message))
except websockets.ConnectionClosed:
print("Connection closed")
break
except Exception as e:
print(f"WebSocket connection failed: {e}")
def find_coords(message: str) -> list[str]:
coords_begin = [m.start() + 8 for m in re.finditer('"coord":', message)]
coords_end = [m.start() + 1 for m in re.finditer('],"depth":', message)]
coords = [
json.loads(message[coords_begin[i] : coords_end[i]])
for i in range(len(coords_begin))
]
return coords
def run(q: queue.Queue) -> None:
base_url = "https://172.22.114.176" # Replace with your UniFi Protect base URL
username = "engr-ugaif" # Replace with your username
password = dotenv.get_key(dotenv.find_dotenv(), "UNIFI_PASSWORD")
ws_url = "wss://172.22.114.176/proxy/protect/ws/liveDetectTrack?camera=668daa1e019ce603e4002d31" # Replace with your WebSocket URL
cookies = get_token(base_url, username, password)
asyncio.run(listen_to_event_stream(ws_url, cookies, q))

108
UnifiWebsockets/decode.py Normal file
View File

@ -0,0 +1,108 @@
import zlib
import struct
import json
from typing import Tuple, Union
# Constants
EVENT_PACKET_HEADER_SIZE = 8
# Packet Types
HEADER = 1
PAYLOAD = 2
# Payload Types
JSON_TYPE = 1
STRING_TYPE = 2
BUFFER_TYPE = 3
def decode_packet(packet: bytes) -> Union[Tuple[dict, Union[dict, str, bytes]], None]:
try:
data_offset = struct.unpack('>I', packet[ProtectEventPacketHeader.PAYLOAD_SIZE:ProtectEventPacketHeader.PAYLOAD_SIZE + 4])[0] + EVENT_PACKET_HEADER_SIZE
if len(packet) != (data_offset + EVENT_PACKET_HEADER_SIZE + struct.unpack('>I', packet[data_offset + ProtectEventPacketHeader.PAYLOAD_SIZE:data_offset + ProtectEventPacketHeader.PAYLOAD_SIZE + 4])[0]):
raise ValueError("Packet length doesn't match header information.")
except Exception as error:
print(f"Error decoding update packet: {error}")
return None
header_frame = decode_frame(packet[:data_offset], HEADER)
payload_frame = decode_frame(packet[data_offset:], PAYLOAD)
if not header_frame or not payload_frame:
return None
return header_frame, payload_frame
def decode_frame(packet: bytes, packet_type: int) -> Union[dict, str, bytes, None]:
frame_type = packet[ProtectEventPacketHeader.TYPE]
if packet_type != frame_type:
return None
payload_format = packet[ProtectEventPacketHeader.PAYLOAD_FORMAT]
is_deflated = packet[ProtectEventPacketHeader.DEFLATED]
if is_deflated:
payload = zlib.decompress(packet[EVENT_PACKET_HEADER_SIZE:])
else:
payload = packet[EVENT_PACKET_HEADER_SIZE:]
if frame_type == HEADER:
if payload_format == JSON_TYPE:
return json.loads(payload.decode('utf-8'))
else:
return None
if payload_format == JSON_TYPE:
return json.loads(payload.decode('utf-8'))
elif payload_format == STRING_TYPE:
return payload.decode('utf-8')
elif payload_format == BUFFER_TYPE:
return payload
else:
print(f"Unknown payload packet type received in the realtime events API: {payload_format}")
return None
def decode_packet_from_mac(packet: bytes, mac: str) -> Union[Tuple[dict, Union[dict, str, bytes]], None]:
try:
data_offset = struct.unpack('>I', packet[ProtectEventPacketHeader.PAYLOAD_SIZE:ProtectEventPacketHeader.PAYLOAD_SIZE + 4])[0] + EVENT_PACKET_HEADER_SIZE
if len(packet) != (data_offset + EVENT_PACKET_HEADER_SIZE + struct.unpack('>I', packet[data_offset + ProtectEventPacketHeader.PAYLOAD_SIZE:data_offset + ProtectEventPacketHeader.PAYLOAD_SIZE + 4])[0]):
raise ValueError("Packet length doesn't match header information.")
except Exception as error:
print(f"Error decoding update packet: {error}")
return None
header_frame = decode_frame(packet[:data_offset], HEADER)
if not header_frame:
return None
# Check if modelKey is 'camera' and mac matches
if header_frame.get('modelKey') != 'camera' or header_frame.get('mac') != mac:
return None
payload_frame = decode_frame(packet[data_offset:], PAYLOAD)
if not payload_frame:
return None
return header_frame, payload_frame
class ProtectEventPacketHeader:
TYPE = 0
PAYLOAD_FORMAT = 1
DEFLATED = 2
UNKNOWN = 3
PAYLOAD_SIZE = 4
# Example usage:
# packet = b'...' # binary string from websocket
# mac_address = 'XX:XX:XX:XX:XX:XX' # Replace with the desired MAC address
# result = decode_packet_from_mac(packet, mac_address)
# if result:
# action_frame, data_frame = result
# print("Action Frame:", action_frame)
# print("Data Frame:", data_frame)
# else:
# print("No matching packet found.")

65
main.py Normal file
View File

@ -0,0 +1,65 @@
from UnifiWebsockets import Unifi
from URSentry import URSentry
import queue
import time
import threading
import BBoxProcessor
q = queue.Queue()
x = threading.Thread(target=Unifi.run, args=(q,))
x.start()
b = BBoxProcessor.BBoxProcessor()
ur = URSentry("172.22.114.160")
joystick_queue = queue.Queue()
class CameraJoystick(threading.Thread):
def __init__(self, joystick_queue: queue.Queue[list[int]]):
threading.Thread.__init__(self)
self.joystick_queue = joystick_queue
self.keep_running = True
def run(self):
while self.keep_running:
bb = b.get_joystick_position_from_new_set_of_bboxes(q.get())
print(bb)
self.joystick_queue.put(bb)
def stop(self):
self.keep_running = False
current_joystick = [0, 0]
class JoystickScheduler(threading.Thread):
def __init__(self, joystick_queue: queue.Queue[list[int]]):
threading.Thread.__init__(self)
self.joystick_queue = joystick_queue
self.keep_running = True
def run(self):
while self.keep_running:
global current_joystick
current_joystick = self.joystick_queue.get()
def stop(self):
self.keep_running = False
camera_joystick = CameraJoystick(joystick_queue)
camera_joystick.start()
joystick_scheduler = JoystickScheduler(joystick_queue)
joystick_scheduler.start()
ur.await_stop = True
ur.forward_position()
while True:
ur.move_robot_base(-current_joystick[0])
print("Setting robot base velocity to: ", current_joystick[0])
time.sleep(0.2)