secure_command.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. #!/usr/bin/env python3
  2. '''
  3. perform a secure parameter change on a ArduRemoteID node via DroneCAN
  4. user must supply a private key corresponding to one of the public keys on the node
  5. '''
  6. import dronecan, time, sys, random, base64, struct
  7. from dronecan import uavcan
  8. try:
  9. import monocypher
  10. except ImportError:
  11. print("Please install monocypher with: python3 -m pip install pymonocypher")
  12. sys.exit(1)
  13. # get command line arguments
  14. from argparse import ArgumentParser
  15. parser = ArgumentParser(description='secure_command')
  16. parser.add_argument("--bitrate", default=1000000, type=int, help="CAN bit rate")
  17. parser.add_argument("--node-id", default=100, type=int, help="local CAN node ID")
  18. parser.add_argument("--target-node", default=None, type=int, help="target node ID")
  19. parser.add_argument("--private-key", default=None, type=str, help="private key file")
  20. parser.add_argument("--bus-num", default=1, type=int, help="MAVCAN bus number")
  21. parser.add_argument("uri", default=None, type=str, help="CAN URI")
  22. parser.add_argument("paramop", default=None, type=str, help="parameter operation")
  23. args = parser.parse_args()
  24. should_exit = False
  25. if args.target_node is None:
  26. print("Must specify target node ID")
  27. should_exit = True
  28. if args.private_key is None:
  29. print("Must specify private key file")
  30. should_exit = True
  31. if should_exit:
  32. sys.exit(1)
  33. SECURE_COMMAND_GET_REMOTEID_SESSION_KEY = dronecan.dronecan.remoteid.SecureCommand.Request().SECURE_COMMAND_GET_REMOTEID_SESSION_KEY
  34. SECURE_COMMAND_SET_REMOTEID_CONFIG = dronecan.dronecan.remoteid.SecureCommand.Request().SECURE_COMMAND_SET_REMOTEID_CONFIG
  35. session_key = None
  36. sequence = random.randint(0, 0xFFFFFFFF)
  37. last_session_key_req = 0
  38. last_set_config = 0
  39. # Initializing a DroneCAN node instance.
  40. node = dronecan.make_node(args.uri, node_id=args.node_id, bitrate=args.bitrate)
  41. node.can_driver.set_bus(args.bus_num)
  42. # Initializing a node monitor
  43. node_monitor = dronecan.app.node_monitor.NodeMonitor(node)
  44. def get_session_key_response(reply):
  45. if not reply:
  46. # timed out
  47. return
  48. global session_key
  49. session_key = bytearray(reply.response.data)
  50. print("Got session key")
  51. def get_private_key():
  52. '''get private key, return 32 byte key or None'''
  53. if args.private_key is None:
  54. return None
  55. try:
  56. d = open(args.private_key,'r').read()
  57. except Exception as ex:
  58. return None
  59. ktype = "PRIVATE_KEYV1:"
  60. if not d.startswith(ktype):
  61. return None
  62. return base64.b64decode(d[len(ktype):])
  63. def make_signature(seq, command, data):
  64. '''make a signature'''
  65. private_key = get_private_key()
  66. d = struct.pack("<II", seq, command)
  67. d += data
  68. if command != SECURE_COMMAND_GET_REMOTEID_SESSION_KEY:
  69. if session_key is None:
  70. print("No session key")
  71. raise Exception("No session key")
  72. d += session_key
  73. return monocypher.signature_sign(private_key, d)
  74. def request_session_key():
  75. '''request a session key'''
  76. global sequence
  77. sig = make_signature(sequence, SECURE_COMMAND_GET_REMOTEID_SESSION_KEY, bytes())
  78. node.request(dronecan.dronecan.remoteid.SecureCommand.Request(
  79. sequence=sequence,
  80. operation=SECURE_COMMAND_GET_REMOTEID_SESSION_KEY,
  81. sig_length=len(sig),
  82. data=sig),
  83. args.target_node,
  84. get_session_key_response)
  85. sequence = (sequence+1) % (1<<32)
  86. print("Requested session key")
  87. def config_change_response(reply):
  88. if not reply:
  89. # timed out
  90. return
  91. result_map = {
  92. 0: "ACCEPTED",
  93. 1: "TEMPORARILY_REJECTED",
  94. 2: "DENIED",
  95. 3: "UNSUPPORTED",
  96. 4: "FAILED" }
  97. result = result_map.get(reply.response.result, "invalid")
  98. print("Got change response: %s" % result)
  99. sys.exit(reply.response.result)
  100. def send_config_change():
  101. '''send remoteid config change'''
  102. global sequence
  103. req = args.paramop.encode('utf-8')
  104. sig = make_signature(sequence, SECURE_COMMAND_SET_REMOTEID_CONFIG, req)
  105. node.request(dronecan.dronecan.remoteid.SecureCommand.Request(
  106. sequence=sequence,
  107. operation=SECURE_COMMAND_SET_REMOTEID_CONFIG,
  108. sig_length=len(sig),
  109. data=req+sig),
  110. args.target_node,
  111. config_change_response)
  112. sequence = (sequence+1) % (1<<32)
  113. print("Requested config change")
  114. def update():
  115. now = time.time()
  116. global last_session_key_req, last_set_config, session_key
  117. if session_key is None and now - last_session_key_req > 2.0:
  118. last_session_key_req = now
  119. request_session_key()
  120. if session_key is not None and now - last_set_config > 2.0:
  121. last_set_config = now
  122. send_config_change()
  123. while True:
  124. try:
  125. update()
  126. node.spin(timeout=0.1)
  127. except Exception as ex:
  128. print(ex)