基於ROS和python,經過TCP通訊協議,完成鍵盤無線控制移動機器人運動

1、所需工具包

 1.ROS鍵盤包:teleop_twist_keyboard 

 2.TCP通信包:socket

 

  •     $ cd ~/catkin_ws/src
  •     $ git clone https://github.com/Forrest-Z/teleop_twist_keyboard.git
  •     $ catkin_make

 3.在ubuntu的ros中創建一個ros_car_py包:

 

  •     $ cd ~/catkin_ws/src
  •     $ catkin_create_pkg ros_car_py roscpp rospy std_msgs

4.新建 base 文件:

  •     $ cd catkin_ws/src/base
  •     $ mkdir src 
  •     $ vim src/base.py

代碼以下(ROS做爲TCP服務器):

#!/usr/bin/env python
    # coding=utf-8
    import rospy
    from socket import *
    import time
    from threading import Thread
    from std_msgs.msg import String
    from geometry_msgs.msg import Twist
    msg_list = []
    def callback(cmd_input, Socket):
        print("-----服務器已經啓動成功,準備接收數據-----")
        Socket.settimeout(5)
        recvdata = Socket.recv(4096)
        t = Twist()
        t.angular.z = cmd_input.angular.z
        t.linear.x = cmd_input.linear.x
        left_speed = t.linear.x - t.angular.z * 0.5 * 0.2
        right_speed = t.linear.x + t.angular.z * 0.5 * 0.2
        left_speed *= 1000
        right_speed *= 1000
        left_speed = str(left_speed)
        right_speed = str(right_speed)
        # msg_list.append(left_speed)
        # msg_list.append(right_speed)
        print("left_speed=%s" % left_speed)
        print("right_speed=%s" % right_speed)
        if len(recvdata) != 0:
          print("-----接收到數據-----")
          print("recvdata:%s" % recvdata)
          # Socket.send(b"hello beaglebone")
          # Socket.send(b"左輪速度")
          Socket.send(left_speed.encode("utf-8"))
          # Socket.send(b"右輪速度")
          Socket.send(right_speed.encode("utf-8"))
          # Socket.send(msg_list)
        else:
          print('-----未接收到客戶端數據,可能鏈接已經斷開-----')
          # Socket.send(b'client off')
          # 數據中斷時進行服務重啓程序,先close 再accept等待從新連線
          # 能夠防止出現當client意外終止致使server的中斷(Broken pipe錯誤)
          print('-----正在從新創建鏈接-----')
          Socket.close()
        Socket, clientInfo = serverSocket.accept()
        # serverSocket.close()
    def main():
        rospy.init_node("base")
        serverSocket = socket(AF_INET, SOCK_STREAM)
        serverSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        serverSocket.bind(('', 8899))
        serverSocket.listen(5)
        print("-----服務器正在啓動-----")
        Socket, clientInfo = serverSocket.accept()
        sub = rospy.Subscriber("cmd_vel", Twist, callback, Socket)
        rate = rospy.Rate(10)
        rospy.spin()
    if __name__ == "__main__":
        main()

  

注意事項:

 

  •     ROS中的python是python2,使用python3會出錯,因此須要在開頭加上#!/usr/bin/env python
  •     編寫好python程序後,編譯成功可是沒法運行,報錯Couldn't find executable named XXX.py,沒法執行
  • 問題緣由:
  •     文件沒有執行權限
  • 解決辦法:
  •     給文件添加執行權限
  •     命令:chmod +x base.py

修改CMakeList.txt:

 

  1.     cmake_minimum_required(VERSION 2.8.3)
  2.     project(ros_car_py)
  3.     find_package(catkin REQUIRED COMPONENTS
  4.       roscpp
  5.       rospy
  6.       std_msgs
  7.       message_generation
  8.     )
  9.     add_message_files(
  10.       FILES
  11.       MsgCar.msg
  12.     )
  13.     generate_messages(
  14.       DEPENDENCIES
  15.       std_msgs
  16.     )
  17.     catkin_package(
  18.     #  INCLUDE_DIRS include
  19.     #  LIBRARIES ros_car_py
  20.       CATKIN_DEPENDS message_runtime
  21.     #roscpp rospy std_msgs
  22.     #  DEPENDS system_lib
  23.     )
  24.     include_directories(
  25.     # include
  26.       ${catkin_INCLUDE_DIRS}
  27.     )

單獨編譯ros_car_pkg包:

  •         $ catkin_make -DCATKIN_WHITELIST_PACKAGES='ros_car_py'

2、控制原理:

  •     當咱們按下鍵盤時,teleop_twist_keyboard 包會發布 /cmd_vel 發佈速度主題
  •     在 base 節點訂閱這個話題,接收速度數據,轉換成字符串(TCP只容許發送字符串),而後發送至客戶端

設置beaglebone做爲客戶端:

運行代碼加載設備樹並讀串口數據用於控制PWM,進而控制小車運動

from socket import *
    import time
    SLOTS = "/sys/devices/bone_capemgr.9/slots"
    p1_duty = "/sys/devices/ocp.3/pwm_test_P9_16.16/duty"
    p2_duty = "/sys/devices/ocp.3/pwm_test_P8_13.15/duty"
    p1_period = "/sys/devices/ocp.3/pwm_test_P9_16.16/period"
    p2_period = "/sys/devices/ocp.3/pwm_test_P8_13.15/period"
    p1_run = "/sys/devices/ocp.3/pwm_test_P9_16.16/run"
    p2_run = "/sys/devices/ocp.3/pwm_test_P8_13.15/run"
    p1_export = "/sys/class/gpio/export"
    p2_export = "/sys/class/gpio/export"
    p1_direction = "/sys/class/gpio/gpio44/direction"
    p2_direction = "/sys/class/gpio/gpio45/direction"
    p1_polarity = "/sys/class/gpio/gpio44/value"
    p2_polarity = "/sys/class/gpio/gpio45/value"
    msg_lists = []
    clientSocket = socket(AF_INET, SOCK_STREAM)
    clientSocket.connect(("192.168.1.151", 8899))
    try:
        with open(SLOTS, "a") as f:
          f.write("am33xx_pwm")
        with open(SLOTS, "a") as f:
          f.write("bone_pwm_P9_22")
        with open(SLOTS, "a") as f:
          f.write("bone_pwm_P9_16")
        with open(p1_export, "a") as f:
          f.write("44")
        with open(p2_export, "a") as f:
          f.write("45")
        with open(p1_direction, "a") as f:
          f.write("in")
        with open(p2_direction, "a") as f:
          f.write("in")
    except:
        pass
    while True:
        clientSocket.send(b"hello ROS")
        msg_list1 = clientSocket.recv(1024)
        msg_list2 = clientSocket.recv(1024)
        if len(msg_list1) or len(msg_list2) > 0:
        msg_lists.append(msg_list1)
        msg_lists.append(msg_list2)
        for msg in msg_lists:
            print("recvData:%s" % msg)
        if msg_lists[0] == '500.0' and msg_lists[1] == '500.0':
            msg_lists = []
            print("succes")
            try:
                with open(p1_period, "a") as f:
                    f.write("500000")
                with open(p1_duty, "a") as f:
                    f.write("250000")
                with open(p2_period, "a") as f:
                    f.write("500000")
                with open(p2_duty, "a") as f:
                    f.write("250000")
                with open(p1_run, "a") as f:
                    f.write("1")
                with open(p2_run, "a") as f:
                    f.write("1")
                with open(p1_polarity, "a") as f:
                    f.write("1")
                with open(p2_polarity, "a") as f:
                    f.write("1")
            except:
                pass
        elif msg_lists[0] == '400.0' and msg_lists[1] == '600.0':
            msg_lists = []
            try:
                with open(p1_period, "a") as f:
                    f.write("400000")
                with open(p1_duty, "a") as f:
                    f.write("200000")
                with open(p2_period, "a") as f:
                    f.write("600000")
                with open(p2_duty, "a") as f:
                    f.write("300000")
                with open(p1_run, "a") as f:
                    f.write("1")
                with open(p2_run, "a") as f:
                    f.write("1")
                with open(p1_polarity, "a") as f:
                    f.write("1")
                with open(p2_polarity, "a") as f:
                    f.write("1")
            except:
                pass
        elif msg_lists[0] == '600.0' and msg_lists[1] == '400.0':
            msg_lists = []
            try:
                with open(p1_period, "a") as f:
                    f.write("600000")
                with open(p1_duty, "a") as f:
                    f.write("300000")
                with open(p2_period, "a") as f:
                    f.write("400000")
                with open(p2_duty, "a") as f:
                    f.write("200000")
                with open(p1_run, "a") as f:
                    f.write("1")
                with open(p2_run, "a") as f:
                    f.write("1")
                with open(p1_polarity, "a") as f:
                    f.write("1")
                with open(p2_polarity, "a") as f:
                    f.write("1")
            except:
                pass
        elif msg_lists[0] == '-500.0' and msg_lists[1] == '-500.0':
            msg_lists = []
            try:
                with open(p1_period, "a") as f:
                    f.write("500000")
                with open(p1_duty, "a") as f:
                    f.write("250000")
                with open(p2_period, "a") as f:
                    f.write("500000")
                with open(p2_duty, "a") as f:
                    f.write("250000")
                with open(p1_run, "a") as f:
                    f.write("1")
                with open(p2_run, "a") as f:
                    f.write("1")
                with open(p1_polarity, "a") as f:
                    f.write("0")
                with open(p2_polarity, "a") as f:
                    f.write("0")
            except:
                pass
        elif msg_lists[0] == '-600.0' and msg_lists[1] == '-400.0':
            msg_lists = []
            try:
                with open(p1_period, "a") as f:
                    f.write("600000")
                with open(p1_duty, "a") as f:
                    f.write("300000")
                with open(p2_period, "a") as f:
                    f.write("400000")
                with open(p2_duty, "a") as f:
                    f.write("200000")
                with open(p1_run, "a") as f:
                    f.write("1")
                with open(p2_run, "a") as f:
                    f.write("1")
                with open(p1_polarity, "a") as f:
                    f.write("0")
                with open(p2_polarity, "a") as f:
                    f.write("0")
            except:
                pass
        elif msg_lists[0] == '-400.0' and msg_lists[1] == '-600.0':
            msg_lists = []
            try:
                with open(p1_period, "a") as f:
                    f.write("400000")
                with open(p1_duty, "a") as f:
                    f.write("200000")
                with open(p2_period, "a") as f:
                    f.write("600000")
                with open(p2_duty, "a") as f:
                    f.write("300000")
                with open(p1_run, "a") as f:
                    f.write("1")
                with open(p2_run, "a") as f:
                    f.write("1")
                with open(p1_polarity, "a") as f:
                    f.write("0")
                with open(p2_polarity, "a") as f:
                    f.write("0")
            except:
                pass
        elif msg_lists[0] == '0.0' and msg_lists[1] == '0.0':
            msg_lists = []
            try:
                with open(p1_run, "a") as f:
                    f.write("0")
                with open(p2_run, "a") as f:
                    f.write("0")
            except:
                pass
        else:
            pass
        else:
        time.sleep(0.1)
        clientSocket = socket(AF_INET, SOCK_STREAM)
        clientSocket.connect(("192.168.1.138", 8899))
相關文章
相關標籤/搜索