secure_command.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. #!/usr/bin/env python3
  2. '''
  3. perform a secure parameter change on a ArduRemoteID node via DroneCAN
  4. user must supply a private key corresponding to one of the public keys on the node
  5. '''
  6. import dronecan, time, sys, random, base64, struct
  7. from dronecan import uavcan
  8. try:
  9. import monocypher
  10. except ImportError:
  11. print("Please install monocypher with: python3 -m pip install pymonocypher")
  12. sys.exit(1)
  13. # get command line arguments
  14. from argparse import ArgumentParser
  15. parser = ArgumentParser(description='secure_command')
  16. parser.add_argument("--bitrate", default=1000000, type=int, help="CAN bit rate")
  17. parser.add_argument("--node-id", default=100, type=int, help="local CAN node ID")
  18. parser.add_argument("--target-node", default=None, type=int, help="target node ID")
  19. parser.add_argument("--private-key", default=None, type=str, help="private key file")
  20. parser.add_argument("--bus-num", default=1, type=int, help="MAVCAN bus number")
  21. parser.add_argument("--signing-passphrase", help="MAVLink2 signing passphrase", default=None)
  22. parser.add_argument("uri", default=None, type=str, help="CAN URI")
  23. parser.add_argument("paramop", default=None, type=str, help="parameter operation")
  24. args = parser.parse_args()
  25. should_exit = False
  26. if args.target_node is None:
  27. print("Must specify target node ID")
  28. should_exit = True
  29. if args.private_key is None:
  30. print("Must specify private key file")
  31. should_exit = True
  32. if should_exit:
  33. sys.exit(1)
  34. SECURE_COMMAND_GET_REMOTEID_SESSION_KEY = dronecan.dronecan.remoteid.SecureCommand.Request().SECURE_COMMAND_GET_REMOTEID_SESSION_KEY
  35. SECURE_COMMAND_SET_REMOTEID_CONFIG = dronecan.dronecan.remoteid.SecureCommand.Request().SECURE_COMMAND_SET_REMOTEID_CONFIG
  36. session_key = None
  37. sequence = random.randint(0, 0xFFFFFFFF)
  38. last_session_key_req = 0
  39. last_set_config = 0
  40. # Initializing a DroneCAN node instance.
  41. node = dronecan.make_node(args.uri, node_id=args.node_id, bitrate=args.bitrate)
  42. node.can_driver.set_bus(args.bus_num)
  43. if args.signing_passphrase is not None:
  44. node.can_driver.set_signing_passphrase(args.signing_passphrase)
  45. # Initializing a node monitor
  46. node_monitor = dronecan.app.node_monitor.NodeMonitor(node)
  47. def get_session_key_response(reply):
  48. if not reply:
  49. # timed out
  50. return
  51. global session_key
  52. session_key = bytearray(reply.response.data)
  53. print("Got session key")
  54. def get_private_key():
  55. '''get private key, return 32 byte key or None'''
  56. if args.private_key is None:
  57. return None
  58. try:
  59. d = open(args.private_key,'r').read()
  60. except Exception as ex:
  61. return None
  62. ktype = "PRIVATE_KEYV1:"
  63. if not d.startswith(ktype):
  64. return None
  65. return base64.b64decode(d[len(ktype):])
  66. def make_signature(seq, command, data):
  67. '''make a signature'''
  68. private_key = get_private_key()
  69. d = struct.pack("<II", seq, command)
  70. d += data
  71. if command != SECURE_COMMAND_GET_REMOTEID_SESSION_KEY:
  72. if session_key is None:
  73. print("No session key")
  74. raise Exception("No session key")
  75. d += session_key
  76. return monocypher.signature_sign(private_key, d)
  77. def request_session_key():
  78. '''request a session key'''
  79. global sequence
  80. sig = make_signature(sequence, SECURE_COMMAND_GET_REMOTEID_SESSION_KEY, bytes())
  81. node.request(dronecan.dronecan.remoteid.SecureCommand.Request(
  82. sequence=sequence,
  83. operation=SECURE_COMMAND_GET_REMOTEID_SESSION_KEY,
  84. sig_length=len(sig),
  85. data=sig),
  86. args.target_node,
  87. get_session_key_response)
  88. sequence = (sequence+1) % (1<<32)
  89. print("Requested session key")
  90. def config_change_response(reply):
  91. if not reply:
  92. # timed out
  93. return
  94. result_map = {
  95. 0: "ACCEPTED",
  96. 1: "TEMPORARILY_REJECTED",
  97. 2: "DENIED",
  98. 3: "UNSUPPORTED",
  99. 4: "FAILED" }
  100. result = result_map.get(reply.response.result, "invalid")
  101. print("Got change response: %s" % result)
  102. sys.exit(reply.response.result)
  103. def send_config_change():
  104. '''send remoteid config change'''
  105. global sequence
  106. req = args.paramop.encode('utf-8')
  107. sig = make_signature(sequence, SECURE_COMMAND_SET_REMOTEID_CONFIG, req)
  108. node.request(dronecan.dronecan.remoteid.SecureCommand.Request(
  109. sequence=sequence,
  110. operation=SECURE_COMMAND_SET_REMOTEID_CONFIG,
  111. sig_length=len(sig),
  112. data=req+sig),
  113. args.target_node,
  114. config_change_response)
  115. sequence = (sequence+1) % (1<<32)
  116. print("Requested config change")
  117. def update():
  118. now = time.time()
  119. global last_session_key_req, last_set_config, session_key
  120. if session_key is None and now - last_session_key_req > 2.0:
  121. last_session_key_req = now
  122. request_session_key()
  123. if session_key is not None and now - last_set_config > 2.0:
  124. last_set_config = now
  125. send_config_change()
  126. while True:
  127. try:
  128. update()
  129. node.spin(timeout=0.1)
  130. except Exception as ex:
  131. print(ex)