import numpy import serial import time import numpy as np from dynamixel_sdk import * # Uses Dynamixel SDK library from numpy import * import matplotlib.pyplot as plt import matplotlib.animation as animation import serial import sympy as sym from sympy.abc import s,t,x,y,z from sympy.integrals import inverse_laplace_transform ''' motor ID and correspond control position(6,000) # Value up: Thumb flexion position(7,00) # Value up: Thumb flexion position(11,000) # Value down: Thumb extension position(14,2000) # Value down: Thumb abduction position(8,0000) # Value up: index flexion position(9,0000) # Value up: index flexion position(12,4000) # Value down: index extension position(13,4000) # Value down: index extension ''' # Control table address ADDR_MX_TORQUE_ENABLE = 24 # Control table address is different in Dynamixel model ADDR_MX_GOAL_POSITION = 30 # The control address for MX12-W in on: http://emanual.robotis.com/docs/en/dxl/mx/mx-12w/#control-table-of-eeprom-area AddR_MX_GOAL_TORQUE = 34 ADDR_MX_GOAL_SPEED = 32 ADDR_MX_PRESENT_POSITION = 36 #show the present position ADDR_MX_PRESENT_LOAD = 40 #show the present load # Protocol version PROTOCOL_VERSION = 1.0 # See which protocol version is used in the Dynamixel # Default setting DXL_ID = 1 # Dynamixel ID : 1 BAUDRATE = 1000000 # Dynamixel default baudrate : 57600 DEVICENAME = 'COM8' # Check which port is being used on your controller # ex) Windows: "COM1" Linux: "/dev/ttyUSB0" Mac: "/dev/tty.usbserial-*" TORQUE_ENABLE = 1 # Value for enabling the torque TORQUE_DISABLE = 0 # Value for disabling the torque DXL_MINIMUM_POSITION_VALUE = 0 # Dynamixel will rotate between this value DXL_MAXIMUM_POSITION_VALUE = 4095 # and this value (note that the Dynamixel would not move when the position value is out of movable range. Check e-manual about the range of the Dynamixel you use.) DXL_MINIMUM_SPEED_VALUE = 0 DXL_MAXIMUM_SPEED_VALUE = 1023 DXL_MOVING_STATUS_THRESHOLD = 80 # Dynamixel moving status threshold ESC_ASCII_VALUE = 0x1b COMM_SUCCESS = 0 # Communication Success result value COMM_TX_FAIL = -1001 # Communication Tx Failed dxl_comm_result = COMM_TX_FAIL # Communication result dxl_goal_position = 1000 #input the target position pressure =0 #realtime pressure lastpressure=0 # record last time pressure newpressure=0 #record present pressure ss=0 #preset the init value to nerual signal output=0 n=800 #set the number of data you should store list1 = [0] * n #Store the pressure signal, #populate list, length n with n entries "None" list2 = [0] * n #Store the nerual signal, populate list, length n with n entries "None" list3 = [0] * n ##Store the target motor torque ,populate list, length n with n entries "None" list4 = [0] * n ##Store the target motor position ,populate list, length n with n entries "None" list5 = [0] * n ##Store the motor load ,populate list, length n with n entries "None" i = 1 #count the nerual signal numbers number=0 # count the neural spike number every 10 times list6 = [0] * 400 #store neural 10 spike number into a list targettorque = 0 #target torque motor control based on the neural spike dxl_present_load=0 #show the motor load # Initialize PortHandler instance # Set the port path # Get methods and members of PortHandlerLinux or PortHandlerWindows portHandler = PortHandler(DEVICENAME) # Initialize PacketHandler instance # Set the protocol version # Get methods and members of Protocol1PacketHandler or Protocol2PacketHandler packetHandler = PacketHandler(PROTOCOL_VERSION) #初始数据绘图 #line1:pressure #line2:neural signal #line3:Target motor torque #line4:Target motor position #line4:motor Load dis = list1 #STORE the pressure signal dis2 = list2 #Store the nerual signal dis3 = list3 #Store the target motor torque dis4 = list4 #Store the target motor position dis5 = list5 #Store the motor load fig, (ax1,ax2,ax3,ax4,ax5) = plt.subplots(5,1, sharex='none', sharey='none')# each row have different axis 4行1列 line1, = ax1.plot(dis)#show the pressure figure line2, = ax2.plot(dis2) #show the neural figure line3, = ax3.plot(dis3) #show the Target motor torque figure line4, = ax4.plot(dis4) #show the motor torque figure line5, = ax5.plot(dis5) #show the motor torque figure ax1.set_ylim(0, 255) ax1.set_xlim(0, 500) ax1.set_ylabel("Sensor Pressure") ax2.set_ylim(0, 1) ax2.set_xlim(0, 500) ax2.set_ylabel("Neural Signal") ax3.set_ylim(0, 1024) ax3.set_xlim(0, 500) ax3.set_ylabel("Target Torque") ax4.set_ylim(3000, 4000) ax4.set_xlim(0, 500) ax4.set_ylabel("Target Position") ax4.set_xlabel("Cycle") ax5.set_ylim(0, 2048) ax5.set_xlim(0, 500) ax5.set_ylabel("Motor Load") ax5.set_xlabel("Cycle") # open a csv file to record experiment data csv = open('Control-experiment.csv', 'w')# wite the csv file columnTitleRow = "Tactil-Pressure, Nerual-Signal, Target_Torque,Target_Position, Motor-Load\n" csv.write(columnTitleRow)#set the name tag for all colum #motor and serial port initialize class start: def motorinit(self): # Open port if portHandler.openPort(): print("Succeeded to open the port") else: print("Failed to open the port") print("Press any key to terminate...") quit() # Set port baudrate if portHandler.setBaudRate(BAUDRATE): print("Succeeded to change the baudrate") else: print("Failed to change the baudrate") print("Press any key to terminate...") quit() def serialinit(self): global ser # start read the port serial ser = serial.Serial(port='COM6', baudrate=115200, timeout=1) print("connected to: " + ser.portstr) #darw the animation figures def update(frame): dis1 = list1 dis2 = list2 dis3 = list3 dis4 = list4 dis5 = list5 # 绘图 line1.set_ydata(dis1) # draw the sensor pressure figure line2.set_ydata(dis2)#draw the nerual signal figure line3.set_ydata(dis3) # draw the target torque figure line4.set_ydata(dis4) # draw the target position figure line5.set_ydata(dis5) # draw the motor load figure global AA ''' # Store neural signals for i1 in range(0, 720): # push array to store 720 neural signal data list2[i1] = list2[i1 + 1] ''' for z in range(0, 179): # push array to store 179 neural signal data list6[z] = list6[z + 1] number=list6 def find(v, v_peak): index = [] for i in range(len(v[0])): if v[0][i] == v_peak: index.append(i) index_fire = np.asarray(index) return index_fire class nerual_caculation: global i, sa, list2, number, k, kk, kkk,mm,neuron def get_raster(): current = number current = np.transpose(current) current = current * 200 i_index_fire = np.zeros((20, 36), dtype=int) index = np.zeros((36, 10), dtype=int) o = 0 qqq = np.zeros((6, 6), dtype=int) while o < 36: _i = np.zeros((1, 10), dtype=float) _i[0] = current[o] _i = np.transpose(_i) * 15 step = 10 s1 = 0.1 s2 = 0.9 dt = 1 nt = 10 _c = 100 vr = -60 vt = -40 k = 0.7 a = 0.02 b = 0.2 c = -65 d = 6 v_peak = 35 t = np.zeros((1, nt), dtype=float) for i in range(nt): t[0][i] = dt * i v = np.ones((1, nt), dtype=float) * vr u = np.zeros((1, nt), dtype=float) spike_number = 0 for m in range(nt - 1): v_t = v[0][m] + (dt / 2) * (k * (v[0][m] - vr) * (v[0][m] - vt) - u[0][m] + _i[m][0]) / _c v[0][m + 1] = v_t * 2 - v[0][m] u[0][m + 1] = u[0][m] + dt * a * (b * (v[0][m] - vr) - u[0][m]) if v[0][m + 1] >= v_peak: v[0][m] = v_peak v[0][m + 1] = c u[0][m + 1] = u[0][m + 1] + d spike_number += 1 index_fire = find(v, v_peak) index_fire_2 = index_fire[1:] index_fire_1 = index_fire[0:-1] isi = (index_fire_2 - index_fire_1) * dt fire_rate = 2000 / isi f = np.mean(fire_rate) print(str(o + 1) + " Spike Number: " + str(spike_number)) x = np.zeros((1, 20), dtype=float) x2 = len(index_fire) i = 0 while i < x2 - 1: q = index_fire[i] x[0][q] = 1 i += 1 x = np.transpose(x) for i in range(len(i_index_fire)): i_index_fire[i][o] = x[i][0] i = int(o / 6) j = o % 6 qqq[i][j] = len(fire_rate) o += 1 raster = i_index_fire print("raster is: ") print(raster) return raster if __name__ == '__main__': raster = get_raster() xx = np.transpose(raster) t = np.zeros((1, 20), dtype=float) for i in range(20): t[0][i] = i + 1 m1 = len(xx) m2 = len(xx[0]) y = pow(2.71828, (-t / 12.5)) - pow(2.71828, (-t / 0.5)) _y = np.zeros((m1, m2), dtype=float) for i in range(m1): _y[i] = y k = np.zeros((m1, 2 * m2 - 1)) i = 0 while i < m1 - 1: x = np.convolve(xx[i], _y[i]) k[i] = x i += 1 i = 0 _r = np.zeros((205, 39), dtype=float) while i < 204: n = np.random.randint(0, 36, size=11) m = np.zeros((11, 39)) p = len(n) for j in range(11): m[j] = k[n[j]] r = np.sum(m, axis=0) _r[i] = r i += 1 mas = np.sum(_r, axis=1) va = np.max(mas) vb = [] for i in range(len(mas)): if va == mas[i]: vb.append(i) neuron = np.zeros((len(vb), 39), dtype=float) for i in range(len(vb)): neuron[i] = _r[0] ''' print(neuron) ''' mm=neuron U1 = mean(mm) / s ** 2 U2 = mean(mm) / s * 2 U3 = -mean(mm) / s ** 2 # Transfer function G1 = 0.0000009166 / (s ** 2 + 0.01108 * s + 0.00002499) G2 = 0.0000000098 / (s ** 3 + 0.01804*s** 2+0.0001457 * s + 0.000001583) # Calculate responses Y1 = G1 * U1 Y2 = G1 * U2 Y3 = G1* U3 # Inverse Laplace Transform u1 = inverse_laplace_transform(U1, s, t) y1 = inverse_laplace_transform(Y1, s, t) y2 = inverse_laplace_transform(Y2, s, t) y3 = inverse_laplace_transform(Y3, s, t) # generate data for plot tm = np.linspace(0, 1, 10) us = np.zeros(len(tm)) ys = np.zeros(len(tm)) yss = np.zeros(len(tm)) ysss = np.zeros(len(tm)) # substitute numeric values for u and y for u in [u1]: for i in range(len(tm)): us[i] += u.subs(t, tm[i]) for y in [y1]: for i in range(len(tm)): ys[i] += y.subs(t, tm[i]) ''' print(ys) ''' k = np.mean(ys) kk = np.mean(yss) kkk = np.mean(ysss) ''' print("pressure:", pressure, ", nerual signal: ", AA, ", Target Torque ", targettorque, ", Target Position",targetpositionreact, ", motor load: ",dxl_present_load, ) # show the pressure , neural signal and motor load ''' class lift_torque_calculation: def __init__(self,k): global lifttorque global liftspeed lifttorque = 300+ k * 20 #adjustable liftspeed = 100 class hold_torque_calculation: def __init__(self,kk): global targettorqueo,targetpositiono,reactspeedo,list3,list4 targettorqueo = 300+kkk * 100 targetpositiono = 3900 reactspeedo = 200 for i3 in range(0, 10): # push array to store 500 target motor torque data list3[i3] = list3[i3 + 1] list4[i3] = list4[i3 + 1] list3[10] = targettorqueo list4[10] = targetpositiono class react_torque_calculation: def __init__(self,k): global targettorque, targetpositionreact,reactspeed,list3,list4 targettorque = 300+k * 100 targetpositionreact = 4095 reactspeed = 500 for i3 in range(0, 10): # push array to store 500 target motor torque data list3[i3] = list3[i3 + 1] list4[i3] = list4[i3 + 1] list3[10] = targettorque list4[10] = targetpositionreact lift_torque_calculation(k) react_torque_calculation(kk) hold_torque_calculation(kkk) #read motor torque class react_torque_control: def motor(self): global defaultposition0 defaultposition0 = 2000 # adjustable # control motor ID, torque and position based on the Target_torque_calculation result motor_speed_control(8, targettorque, targetpositionreact,reactspeed) # Index motor_speed_control(9, targettorque, targetpositionreact,reactspeed) # Index motor_speed_control(6, targettorque, targetpositionreact,reactspeed) # Thumb motor_speed_control(7, targettorque, targetpositionreact,reactspeed) # Thumb motor_speed_control(16, targettorque, targetpositionreact,reactspeed) # Middle motor_speed_control(10, targettorque, targetpositionreact,reactspeed) # Middle motor_speed_control(2, targettorque, targetpositionreact, reactspeed) # Little motor_speed_control(1, targettorque, targetpositionreact, reactspeed) # Little motor_speed_control(3, targettorque, targetpositionreact, reactspeed) # Ring motor_speed_control(8, lifttorque, defaultposition0, liftspeed) # Index motor_speed_control(9, lifttorque, defaultposition0, liftspeed) # Index motor_speed_control(6, lifttorque, defaultposition0, liftspeed) # Thumb motor_speed_control(7, lifttorque, defaultposition0, liftspeed) # Thumb motor_speed_control(16, lifttorque, defaultposition0, liftspeed) # Middle motor_speed_control(10, lifttorque, defaultposition0, liftspeed) # Middle motor_speed_control(2, lifttorque, defaultposition0, liftspeed) # Little motor_speed_control(1, lifttorque, defaultposition0, liftspeed) # Little motor_speed_control(3, lifttorque, defaultposition0, liftspeed) # Ring motor_speed_control(8, targettorqueo, targetpositiono, reactspeedo) # Index motor_speed_control(9, targettorqueo, targetpositiono, reactspeedo) # Index motor_speed_control(6, targettorqueo, targetpositiono, reactspeedo) # Thumb motor_speed_control(7, targettorqueo, targetpositiono, reactspeedo) # Thumb motor_speed_control(16, targettorqueo, targetpositiono, reactspeedo) # Middle motor_speed_control(10, targettorqueo, targetpositiono, reactspeedo) # Middle motor_speed_control(2, targettorqueo, targetpositiono, reactspeedo) # Little motor_speed_control(1, targettorqueo, targetpositiono, reactspeedo) # Little motor_speed_control(3, targettorqueo, targetpositiono, reactspeedo) # Ring #lift the object class Load_lift: global defaultpositionlift defaultpositionlift = 4095 def position(self): #print(lifttorque) # control motor ID, torque and position based on the Target_torque_calculation result motor_speed_control(8, lifttorque, defaultpositionlift,liftspeed) # Index motor_speed_control(9, lifttorque, defaultpositionlift,liftspeed) # Index motor_speed_control(6, lifttorque, defaultpositionlift,liftspeed) # Thumb motor_speed_control(7, lifttorque, defaultpositionlift,liftspeed) # Thumb motor_speed_control(16, lifttorque, defaultpositionlift,liftspeed) # Middle motor_speed_control(10, lifttorque, defaultpositionlift,liftspeed) # Middle motor_speed_control(2, lifttorque, defaultpositionlift, liftspeed) # Little motor_speed_control(1, lifttorque, defaultpositionlift, liftspeed) # Little motor_speed_control(3, lifttorque, defaultpositionlift, liftspeed) # Ring class motor_control: def __init__(self,ID,torque,position): # Write torque dxl_comm_result, dxl_error = packetHandler.write2ByteTxRx(portHandler, ID, AddR_MX_GOAL_TORQUE, torque) if dxl_comm_result != COMM_SUCCESS: print("%s" % packetHandler.getTxRxResult(dxl_comm_result)) elif dxl_error != 0: print("%s" % packetHandler.getRxPacketError(dxl_error)) dxl_comm_result, dxl_error = packetHandler.write2ByteTxRx(portHandler, ID, ADDR_MX_GOAL_POSITION , position) if dxl_comm_result != COMM_SUCCESS: print("%s" % packetHandler.getTxRxResult(dxl_comm_result)) elif dxl_error != 0: print("%s" % packetHandler.getRxPacketError(dxl_error)) #motor torque/position/speed control class motor_speed_control: def __init__(self,ID,torque,position,speed): # Write torque dxl_comm_result, dxl_error = packetHandler.write2ByteTxRx(portHandler, ID, AddR_MX_GOAL_TORQUE, torque) if dxl_comm_result != COMM_SUCCESS: print("%s" % packetHandler.getTxRxResult(dxl_comm_result)) elif dxl_error != 0: print("%s" % packetHandler.getRxPacketError(dxl_error)) dxl_comm_result, dxl_error = packetHandler.write2ByteTxRx(portHandler, ID, ADDR_MX_GOAL_POSITION , position) if dxl_comm_result != COMM_SUCCESS: print("%s" % packetHandler.getTxRxResult(dxl_comm_result)) elif dxl_error != 0: print("%s" % packetHandler.getRxPacketError(dxl_error)) dxl_comm_result, dxl_error = packetHandler.write2ByteTxRx(portHandler, ID, ADDR_MX_GOAL_SPEED , speed) if dxl_comm_result != COMM_SUCCESS: print("%s" % packetHandler.getTxRxResult(dxl_comm_result)) elif dxl_error != 0: print("%s" % packetHandler.getRxPacketError(dxl_error)) #read motor torque class readtorque: def __init__(self,ID): global dxl_present_load,list5 # read torque dxl_present_load, dxl_comm_result, dxl_error = packetHandler.read2ByteTxRx(portHandler, 8, ADDR_MX_PRESENT_LOAD ) # 40 is the Present Load according to the motor control table if dxl_comm_result != COMM_SUCCESS: print("%s" % packetHandler.getTxRxResult(dxl_comm_result)) elif dxl_error != 0: print("%s" % packetHandler.getRxPacketError(dxl_error)) for i4 in range(0, 499): # push array to store 500 torque data list5[i4] = list5[i4 + 1] list5[499] = dxl_present_load #read the pressure from serial class getpressure: def read(self): while ser.in_waiting: message = ser.readline() decoded_bytes = message.decode(errors='ignore') string = decoded_bytes.rstrip() # remove \n and \r data = string.split(",") if len(data) == 1: global pressure try: pressure = int(data[0]) except ValueError: pass for i2 in range(0, 499): # push array to store 500 data of pressure list1[i2] = list1[i2 + 1] list1[499] = pressure #start the motor and serial connection control = start() control.motorinit() control.serialinit() #set the init motor position and the target move position initposition=0 #adjustable initial motor position initspeed=80 #adjustable initial motor speed motor_speed_control(9, 1000, initposition,initspeed) # input target Motor's ID,torque and position motor_speed_control(8, 1000, initposition,initspeed) # input target Motor's ID,torque and position motor_speed_control(7, 1000, initposition,initspeed) # input target Motor's ID,torque and position motor_speed_control(6, 1000, initposition,initspeed) # input target Motor's ID,torque and position motor_speed_control(10, 1000, initposition,initspeed) # input target Motor's ID,torque and position motor_speed_control(1, 1000, initposition,initspeed) # input target Motor's ID,torque and position motor_speed_control(2, 1000, initposition,initspeed) # input target Motor's ID,torque and position motor_speed_control(3, 1000, initposition,initspeed) # input target Motor's ID,torque and position motor_speed_control(16, 1000, initposition,initspeed) # input target Motor's ID,torque and position while True: pressure_data=getpressure() pressure_data.read() #read sensor's pressure and global it for motor torque control nerual_caculation(pressure) # send the pressure data to calculate the neural signal, then calculate the target torque and position if pressure <= 120: lift_control = Load_lift() lift_control.position() # control the motor to hold the object if pressure > 120 and pressure <=125: react_control = react_torque_control() react_control.motor() # control the motor to conduct target torque and position based on the pressure if pressure > 125 : react_control = react_torque_control() react_control.motor() # control the motor to conduct target torque and position based readtorque(8) #read the load of motor #set the neural signal animation ani = animation.FuncAnimation(fig, update, frames=None, interval=40) plt.ion() plt.show() plt.pause(0.000000000001) # Close port portHandler.closePort()