Hello,
First of all, thank you so much for your help! Yes, let me share the following two scripts:
- This is the function calling groupSyncWrite
def set_command_position(id_position_list):
if connected:
ADDR_PRO_GOAL_POSITION = 116 # Control table address
LEN_PRO_GOAL_POSITION = 4 # Data Byte Length: Note the unit is byte = 8 bits = 2 bits HEX
# Wait until the USB port is released =========================================================================
ttyUSB_data = glob.read_one_data(glob.mem_ttyUSB_data, 'portOccupied')
while ttyUSB_data[0]:
ttyUSB_data = glob.read_one_data(glob.mem_ttyUSB_data, 'portOccupied')
# Occupy the USB port =========================================================================================
data_usb_occupied = {}
data_usb_occupied['portOccupied'] = np.array([1.0])
glob.mem_ttyUSB_data.set(data_usb_occupied)
lock.acquire()
# compose sync_write package
groupSyncWrite = GroupSyncWrite(portHandler, packetHandler,
ADDR_PRO_GOAL_POSITION, LEN_PRO_GOAL_POSITION)
num_of_motors = len(id_position_list)
for iter in range(num_of_motors):
DXL_ID = id_position_list[iter][0]
dxl_goal_position = rad2tick(id_position_list[iter][1])
# Allocate goal position value into byte array
# A word = 4 digits HEX number = 16 bits binary number = 0 - 65535, thus 65536 has loword=0 and hiword=1
# LOBYTE and HIBYTE separates low 4 digit HEX into 2 low + 2 high digits and express in DEX
# e.g. 2047 has LOBYTE=255 and HIBYTE=7, since 15 + 15*16 = 255, 255 + 7*16^2 = 2047
param_goal_position = [DXL_LOBYTE(DXL_LOWORD(dxl_goal_position)),
DXL_HIBYTE(DXL_LOWORD(dxl_goal_position)),
DXL_LOBYTE(DXL_HIWORD(dxl_goal_position)),
DXL_HIBYTE(DXL_HIWORD(dxl_goal_position))]
# Add Dynamixel goal position values to the Syncwrite parameter storage
dxl_addparam_result = groupSyncWrite.addParam(DXL_ID, param_goal_position)
if not dxl_addparam_result:
print("[ID:%03d] groupSyncWrite addparam failed" % DXL_ID)
return False
# Syncwrite goal position
dxl_comm_result = groupSyncWrite.txPacket()
if dxl_comm_result != COMM_SUCCESS:
print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
return False
# Clear syncwrite parameter storage
groupSyncWrite.clearParam()
# Release the USB port =========================================================================================
data_usb_occupied = {}
data_usb_occupied['portOccupied'] = np.array([0.0])
glob.mem_ttyUSB_data.set(data_usb_occupied)
lock.release()
#print("Lock released for send command ----------------------------------------------")
return True
else:
return False
- This is groupSyncWrite
#!/usr/bin/env python
-- coding: utf-8 --
################################################################################
Copyright 2017 ROBOTIS CO., LTD.
Licensed under the Apache License, Version 2.0 (the “License”);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an “AS IS” BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
################################################################################
Author: Ryu Woon Jung (Leon)
from .robotis_def import *
class GroupSyncWrite:
def init(self, port, ph, start_address, data_length):
self.port = port
self.ph = ph
self.start_address = start_address
self.data_length = data_length
self.is_param_changed = False
self.param = []
self.data_dict = {}
self.clearParam()
def makeParam(self):
if not self.data_dict:
return
self.param = []
for dxl_id in self.data_dict:
if not self.data_dict[dxl_id]:
return
self.param.append(dxl_id)
self.param.extend(self.data_dict[dxl_id])
def addParam(self, dxl_id, data):
if dxl_id in self.data_dict: # dxl_id already exist
return False
if len(data) > self.data_length: # input data is longer than set
return False
self.data_dict[dxl_id] = data
self.is_param_changed = True
return True
def removeParam(self, dxl_id):
if dxl_id not in self.data_dict: # NOT exist
return
del self.data_dict[dxl_id]
self.is_param_changed = True
def changeParam(self, dxl_id, data):
if dxl_id not in self.data_dict: # NOT exist
return False
if len(data) > self.data_length: # input data is longer than set
return False
self.data_dict[dxl_id] = data
self.is_param_changed = True
return True
def clearParam(self):
self.data_dict.clear()
def txPacket(self):
if len(self.data_dict.keys()) == 0:
return COMM_NOT_AVAILABLE
if self.is_param_changed is True or not self.param:
self.makeParam()
# =============================================== Xuan code to deal with input/output error ==================================================
# ct = 0
# ret = False
#
# while not ret or ct <= 100:
# try:
# ret = self.ph.syncWriteTxOnly(self.port, self.start_address, self.data_length, self.param,
# len(self.data_dict.keys()) * (1 + self.data_length))
# except:
# ct += 1
# print("Trial {}".format(ct))
#
# return ret
#
return self.ph.syncWriteTxOnly(self.port, self.start_address, self.data_length, self.param,
len(self.data_dict.keys()) * (1 + self.data_length))
I’m sorry in advance since the code is a bit difficult to read it here. Ideally it’d be good if I can directly upload the files here but this community website does not allow me to do it…
I’m looking forward to hearing from you!
Best,