secure_command.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. #!/usr/bin/env python3
  2. '''
  3. perform a secure parameter change on a ArduRemoteID node via DroneCAN
  4. user must supply a private key corresponding to one of the public keys on the node
  5. '''
  6. import dronecan, time, sys, random, base64, struct
  7. from dronecan import uavcan
  8. try:
  9. import monocypher
  10. except ImportError:
  11. print("Please install monocypher with: python3 -m pip install pymonocypher==3.1.3.2")
  12. sys.exit(1)
  13. # get command line arguments
  14. from argparse import ArgumentParser
  15. parser = ArgumentParser(description='secure_command')
  16. parser.add_argument("--bitrate", default=1000000, type=int, help="CAN bit rate")
  17. parser.add_argument("--node-id", default=100, type=int, help="local CAN node ID")
  18. parser.add_argument("--target-node", default=None, type=int, help="target node ID")
  19. parser.add_argument("--private-key", default=None, type=str, help="private key file")
  20. parser.add_argument("--bus-num", default=1, type=int, help="MAVCAN bus number")
  21. parser.add_argument("--signing-passphrase", help="MAVLink2 signing passphrase", default=None)
  22. parser.add_argument("--timeout", help="DroneCAN message timeout", type=float, default=3)
  23. parser.add_argument("uri", default=None, type=str, help="CAN URI")
  24. parser.add_argument("paramop", default=None, type=str, help="parameter operation")
  25. args = parser.parse_args()
  26. should_exit = False
  27. if args.target_node is None:
  28. print("Must specify target node ID")
  29. should_exit = True
  30. if args.private_key is None:
  31. print("Must specify private key file")
  32. should_exit = True
  33. if should_exit:
  34. sys.exit(1)
  35. SECURE_COMMAND_GET_REMOTEID_SESSION_KEY = dronecan.dronecan.remoteid.SecureCommand.Request().SECURE_COMMAND_GET_REMOTEID_SESSION_KEY
  36. SECURE_COMMAND_SET_REMOTEID_CONFIG = dronecan.dronecan.remoteid.SecureCommand.Request().SECURE_COMMAND_SET_REMOTEID_CONFIG
  37. session_key = None
  38. sequence = random.randint(0, 0xFFFFFFFF)
  39. last_session_key_req = 0
  40. last_set_config = 0
  41. # Initializing a DroneCAN node instance.
  42. node = dronecan.make_node(args.uri, node_id=args.node_id, bitrate=args.bitrate)
  43. node.can_driver.set_bus(args.bus_num)
  44. if args.signing_passphrase is not None:
  45. node.can_driver.set_signing_passphrase(args.signing_passphrase)
  46. # Initializing a node monitor
  47. node_monitor = dronecan.app.node_monitor.NodeMonitor(node)
  48. def get_session_key_response(reply):
  49. if not reply:
  50. print("Session key timed out")
  51. return
  52. global session_key
  53. session_key = bytearray(reply.response.data)
  54. print("Got session key")
  55. def get_private_key():
  56. '''get private key, return 32 byte key or None'''
  57. if args.private_key is None:
  58. return None
  59. try:
  60. d = open(args.private_key,'r').read()
  61. except Exception as ex:
  62. return None
  63. ktype = "PRIVATE_KEYV1:"
  64. if not d.startswith(ktype):
  65. return None
  66. return base64.b64decode(d[len(ktype):])
  67. def make_signature(seq, command, data):
  68. '''make a signature'''
  69. private_key = get_private_key()
  70. d = struct.pack("<II", seq, command)
  71. d += data
  72. if command != SECURE_COMMAND_GET_REMOTEID_SESSION_KEY:
  73. if session_key is None:
  74. print("No session key")
  75. raise Exception("No session key")
  76. d += session_key
  77. return monocypher.signature_sign(private_key, d)
  78. def request_session_key():
  79. '''request a session key'''
  80. global sequence
  81. sig = make_signature(sequence, SECURE_COMMAND_GET_REMOTEID_SESSION_KEY, bytes())
  82. node.request(dronecan.dronecan.remoteid.SecureCommand.Request(
  83. sequence=sequence,
  84. operation=SECURE_COMMAND_GET_REMOTEID_SESSION_KEY,
  85. sig_length=len(sig),
  86. data=sig),
  87. args.target_node,
  88. get_session_key_response,
  89. timeout=args.timeout)
  90. sequence = (sequence+1) % (1<<32)
  91. print("Requested session key")
  92. def config_change_response(reply):
  93. if not reply:
  94. print("Config change timed out")
  95. return
  96. result_map = {
  97. 0: "ACCEPTED",
  98. 1: "TEMPORARILY_REJECTED",
  99. 2: "DENIED",
  100. 3: "UNSUPPORTED",
  101. 4: "FAILED" }
  102. result = result_map.get(reply.response.result, "invalid")
  103. print("Got change response: %s" % result)
  104. sys.exit(reply.response.result)
  105. def send_config_change():
  106. '''send remoteid config change'''
  107. global sequence
  108. req = args.paramop.encode('utf-8')
  109. sig = make_signature(sequence, SECURE_COMMAND_SET_REMOTEID_CONFIG, req)
  110. node.request(dronecan.dronecan.remoteid.SecureCommand.Request(
  111. sequence=sequence,
  112. operation=SECURE_COMMAND_SET_REMOTEID_CONFIG,
  113. sig_length=len(sig),
  114. data=req+sig),
  115. args.target_node,
  116. config_change_response,
  117. timeout=args.timeout)
  118. sequence = (sequence+1) % (1<<32)
  119. print("Requested config change")
  120. def update():
  121. now = time.time()
  122. global last_session_key_req, last_set_config, session_key
  123. if session_key is None and now - last_session_key_req > args.timeout+1:
  124. last_session_key_req = now
  125. request_session_key()
  126. if session_key is not None and now - last_set_config > args.timeout+1:
  127. last_set_config = now
  128. send_config_change()
  129. while True:
  130. try:
  131. update()
  132. node.spin(timeout=0.1)
  133. except Exception as ex:
  134. print(ex)