You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
260 lines
14 KiB
260 lines
14 KiB
|
|
import sys |
|
import json |
|
import aiohttp |
|
from flask import Blueprint |
|
from flask import current_app |
|
from flask import request |
|
from flask.json import jsonify |
|
from werkzeug.exceptions import abort |
|
|
|
from capsulflask.shared import my_exec_info_message, authorized_as_hub |
|
|
|
bp = Blueprint("spoke", __name__, url_prefix="/spoke") |
|
|
|
@bp.route("/heartbeat", methods=("POST",)) |
|
def heartbeat(): |
|
if authorized_as_hub(request.headers): |
|
url = f"{current_app.config['HUB_URL']}/hub/heartbeat/{current_app.config['SPOKE_HOST_ID']}" |
|
authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}" |
|
result = current_app.config['HTTP_CLIENT'].do_http_sync(url, body=None, authorization_header=authorization_header) |
|
if result.status_code == -1: |
|
current_app.logger.info(f"/spoke/heartbeat returned 503: hub at {url} timed out or cannot be reached") |
|
return abort(503, "Service Unavailable: hub timed out or cannot be reached") |
|
if result.status_code == 401: |
|
current_app.logger.info(f"/spoke/heartbeat returned 502: hub at {url} rejected our token") |
|
return abort(502, "hub rejected our token") |
|
if result.status_code != 200: |
|
current_app.logger.info(f"/spoke/heartbeat returned 502: hub at {url} returned {result.status_code}") |
|
return abort(502, "Bad Gateway: hub did not return 200") |
|
|
|
return "OK" |
|
else: |
|
current_app.logger.info(f"/spoke/heartbeat returned 401: invalid hub token") |
|
return abort(401, "invalid hub token") |
|
|
|
@bp.route("/operation/<int:operation_id>", methods=("POST",)) |
|
def operation_with_id(operation_id: int): |
|
return operation_impl(operation_id) |
|
|
|
@bp.route("/operation", methods=("POST",)) |
|
def operation_without_id(): |
|
return operation_impl(None) |
|
|
|
def operation_impl(operation_id: int): |
|
if authorized_as_hub(request.headers): |
|
request_body_json = request.json |
|
request_body = json.loads(request_body_json) |
|
#current_app.logger.info(f"request.json: {request_body}") |
|
handlers = { |
|
"capacity_avaliable": handle_capacity_avaliable, |
|
"get": handle_get, |
|
"get_all_by_id": handle_get_all_by_id, |
|
"create": handle_create, |
|
"destroy": handle_destroy, |
|
"vm_state_command": handle_vm_state_command, |
|
"net_set_dhcp": handle_net_set_dhcp, |
|
} |
|
|
|
error_message = "" |
|
types_csv = ", ".join(handlers.keys()) |
|
if isinstance(request_body, dict) and 'type' in request_body: |
|
if request_body['type'] in handlers: |
|
try: |
|
return handlers[request_body['type']](operation_id, request_body) |
|
except: |
|
error_message = my_exec_info_message(sys.exc_info()) |
|
current_app.logger.error(f"unhandled exception in {request_body['type']} handler: {error_message}") |
|
return jsonify(dict(error_message=error_message)) |
|
else: |
|
error_message = f"'type' must be one of {types_csv}" |
|
else: |
|
error_message = "'type' json property is required" |
|
|
|
if error_message != "": |
|
current_app.logger.error(f"/hosts/operation returned 400: {error_message}") |
|
return abort(400, f"bad request; {error_message}") |
|
else: |
|
current_app.logger.warning(f"/hosts/operation returned 401: invalid hub token") |
|
return abort(401, "invalid hub token") |
|
|
|
def handle_capacity_avaliable(operation_id, request_body): |
|
if 'additional_ram_bytes' not in request_body: |
|
current_app.logger.error(f"/hosts/operation returned 400: additional_ram_bytes is required for capacity_avaliable") |
|
return abort(400, f"bad request; additional_ram_bytes is required for capacity_avaliable") |
|
|
|
has_capacity = current_app.config['SPOKE_MODEL'].capacity_avaliable(request_body['additional_ram_bytes']) |
|
return jsonify(dict(assignment_status="assigned", capacity_avaliable=has_capacity)) |
|
|
|
def handle_get(operation_id, request_body): |
|
if 'id' not in request_body: |
|
current_app.logger.error(f"/hosts/operation returned 400: id is required for get") |
|
return abort(400, f"bad request; id is required for get") |
|
|
|
vm = current_app.config['SPOKE_MODEL'].get(request_body['id'], request_body['get_ssh_host_keys']) |
|
if vm is None: |
|
return jsonify(dict(assignment_status="assigned")) |
|
|
|
return jsonify(dict(assignment_status="assigned", id=vm.id, host=vm.host, state=vm.state, ipv4=vm.ipv4, ipv6=vm.ipv6, ssh_host_keys=vm.ssh_host_keys)) |
|
|
|
def handle_get_all_by_id(operation_id, request_body) -> dict: |
|
return jsonify(dict(assignment_status="assigned", vms=current_app.config['SPOKE_MODEL'].get_all_by_id())) |
|
|
|
def handle_create(operation_id, request_body): |
|
if not operation_id: |
|
current_app.logger.error(f"/hosts/operation returned 400: operation_id is required for create ") |
|
return abort(400, f"bad request; operation_id is required. try POST /spoke/operation/<id>") |
|
|
|
parameters = ["email", "id", "os", "size", "template_image_file_name", "vcpus", "memory_mb", "ssh_authorized_keys"] |
|
error_message = "" |
|
for parameter in parameters: |
|
if parameter not in request_body: |
|
error_message = f"{error_message}\n{parameter} is required for create" |
|
|
|
if error_message != "": |
|
current_app.logger.error(f"/hosts/operation returned 400: {error_message}") |
|
return abort(400, f"bad request; {error_message}") |
|
|
|
# only one host should create the vm, so we first race to assign this create operation to ourselves. |
|
# only one host will win this race. |
|
authorization_header = f"Bearer {current_app.config['SPOKE_HOST_TOKEN']}" |
|
url = f"{current_app.config['HUB_URL']}/hub/claim-operation/{operation_id}/{current_app.config['SPOKE_HOST_ID']}" |
|
result = current_app.config['HTTP_CLIENT'].do_http_sync(url, body=None, authorization_header=authorization_header) |
|
|
|
assignment_status = "" |
|
if result.status_code == 200: |
|
try: |
|
assignment_info = json.loads(result.body) |
|
if not isinstance(assignment_info, dict): |
|
return abort(503, f"hub at '{url}' returned 200, but did not return assignment_info json object") |
|
if 'network_name' not in assignment_info: |
|
return abort(503, f"hub at '{url}' returned 200, but the returned assignment_info object did not include network_name") |
|
if 'public_ipv4' not in assignment_info: |
|
return abort(503, f"hub at '{url}' returned 200, but the returned assignment_info object did not include public_ipv4") |
|
|
|
assignment_status = "assigned" |
|
request_body['network_name'] = assignment_info['network_name'] |
|
request_body['public_ipv4'] = assignment_info['public_ipv4'] |
|
except: |
|
return abort(503, f"hub at '{url}' returned 200, but did not return valid json") |
|
|
|
elif result.status_code == 409: |
|
assignment_status = "assigned_to_other_host" |
|
else: |
|
current_app.logger.error(f"{url} returned {result.status_code}: {result.body}") |
|
return abort(503, f"hub did not cleanly handle our request to claim the create operation") |
|
|
|
if assignment_status == "assigned": |
|
# TODO this should probably be spun off as a coroutine |
|
# and at the end of that coroutine, we should set the VM's state from "creating" to "booting" |
|
# see the corresponding TODO in hub_api.on_create_claimed |
|
|
|
try: |
|
current_app.config['SPOKE_MODEL'].create( |
|
email=request_body['email'], |
|
id=request_body['id'], |
|
template_image_file_name=request_body['template_image_file_name'], |
|
vcpus=request_body['vcpus'], |
|
memory_mb=request_body['memory_mb'], |
|
ssh_authorized_keys=list(map(lambda x: x['content'], request_body['ssh_authorized_keys'])), |
|
network_name=request_body['network_name'], |
|
public_ipv4=request_body['public_ipv4'], |
|
) |
|
except: |
|
error_message = my_exec_info_message(sys.exc_info()) |
|
params = f"email='{request_body['email'] if 'email' in request_body else 'KeyError'}', " |
|
params= f"{params} id='{request_body['id'] if 'id' in request_body else 'KeyError'}', " |
|
params= f"{params} template_image_file_name='{request_body['template_image_file_name'] if 'template_image_file_name' in request_body else 'KeyError'}', " |
|
params= f"{params} vcpus='{request_body['vcpus'] if 'vcpus' in request_body else 'KeyError'}', " |
|
params= f"{params} memory_mb='{request_body['memory_mb'] if 'memory_mb' in request_body else 'KeyError'}', " |
|
params= f"{params} ssh_authorized_keys='{request_body['ssh_authorized_keys'] if 'ssh_authorized_keys' in request_body else 'KeyError'}', " |
|
params= f"{params} network_name='{request_body['network_name'] if 'network_name' in request_body else 'KeyError'}', " |
|
params= f"{params} public_ipv4='{request_body['public_ipv4'] if 'public_ipv4' in request_body else 'KeyError'}', " |
|
|
|
current_app.logger.error(f"spoke_model.create({params}) failed: {error_message}") |
|
|
|
return jsonify(dict(assignment_status=assignment_status, error_message=error_message)) |
|
|
|
return jsonify(dict(assignment_status=assignment_status)) |
|
|
|
def handle_destroy(operation_id, request_body): |
|
if 'id' not in request_body: |
|
current_app.logger.error(f"/hosts/operation returned 400: id is required for destroy") |
|
return abort(400, f"bad request; id is required for destroy") |
|
|
|
if 'email' not in request_body: |
|
current_app.logger.error(f"/hosts/operation returned 400: email is required for destroy") |
|
return abort(400, f"bad request; email is required for destroy") |
|
|
|
try: |
|
vm = current_app.config['SPOKE_MODEL'].get(request_body['id'], False) |
|
current_app.logger.warning(f"destroy {request_body['id']} was called for {request_body['email']}, however the vm does not exist. returning success. ") |
|
if vm is not None: |
|
current_app.config['SPOKE_MODEL'].destroy(id=request_body['id'], email=request_body['email']) |
|
except: |
|
error_message = my_exec_info_message(sys.exc_info()) |
|
params = f"email='{request_body['email'] if 'email' in request_body else 'KeyError'}', " |
|
params= f"{params} id='{request_body['id'] if 'id' in request_body else 'KeyError'}', " |
|
current_app.logger.error(f"current_app.config['SPOKE_MODEL'].destroy({params}) failed: {error_message}") |
|
return jsonify(dict(assignment_status="assigned", status="error", error_message=error_message)) |
|
|
|
return jsonify(dict(assignment_status="assigned", status="success")) |
|
|
|
|
|
def handle_vm_state_command(operation_id, request_body): |
|
|
|
required_properties = ['id', 'email', 'command'] |
|
for required_property in required_properties: |
|
if required_property not in request_body: |
|
current_app.logger.error(f"/hosts/operation returned 400: {required_property} is required for vm_state_command") |
|
return abort(400, f"bad request; {required_property} is required for vm_state_command") |
|
|
|
if request_body['command'] not in ["stop", "force-stop", "start", "restart"]: |
|
current_app.logger.error(f"/hosts/operation returned 400: command ({request_body['command']}) must be one of stop, force-stop, start, or restart") |
|
return abort(400, f"bad request; command ({request_body['command']}) must be one of stop, force-stop, start, or restart") |
|
|
|
try: |
|
current_app.config['SPOKE_MODEL'].vm_state_command(id=request_body['id'], email=request_body['email'], command=request_body['command']) |
|
except: |
|
error_message = my_exec_info_message(sys.exc_info()) |
|
params = f"email='{request_body['email'] if 'email' in request_body else 'KeyError'}', " |
|
params= f"{params} id='{request_body['id'] if 'id' in request_body else 'KeyError'}', " |
|
params= f"{params} command='{request_body['command'] if 'command' in request_body else 'KeyError'}', " |
|
current_app.logger.error(f"current_app.config['SPOKE_MODEL'].vm_state_command({params}) failed: {error_message}") |
|
return jsonify(dict(assignment_status="assigned", status="error", error_message=error_message)) |
|
|
|
return jsonify(dict(assignment_status="assigned", status="success")) |
|
|
|
def handle_net_set_dhcp(operation_id, request_body): |
|
|
|
required_properties = ['email', 'network_name', 'macs'] |
|
for required_property in required_properties: |
|
if required_property not in request_body: |
|
current_app.logger.error(f"/hosts/operation returned 400: {required_property} is required for net_set_dhcp") |
|
return abort(400, f"bad request; {required_property} is required for net_set_dhcp") |
|
|
|
remove_is_missing = ('remove_ipv4' not in request_body or request_body['remove_ipv4'] is None) |
|
add_is_missing = ('add_ipv4' not in request_body or request_body['add_ipv4'] is None) |
|
|
|
if remove_is_missing and add_is_missing: |
|
current_app.logger.error(f"/hosts/operation returned 400: either remove_ipv4 or add_ipv4 is required for net_set_dhcp") |
|
return abort(400, f"bad request; either remove_ipv4 or add_ipv4 is required for net_set_dhcp") |
|
|
|
if remove_is_missing: |
|
request_body['remove_ipv4'] = None |
|
if add_is_missing: |
|
request_body['add_ipv4'] = None |
|
|
|
try: |
|
current_app.config['SPOKE_MODEL'].net_set_dhcp(email=request_body['email'], host_id=None, network_name=request_body['network_name'], macs=request_body['macs'], remove_ipv4=request_body['remove_ipv4'], add_ipv4=request_body['add_ipv4']) |
|
except: |
|
error_message = my_exec_info_message(sys.exc_info()) |
|
params= f"email='{request_body['email'] if 'email' in request_body else 'KeyError'}', " |
|
params= f"{params} network_name='{request_body['network_name'] if 'network_name' in request_body else 'KeyError'}', " |
|
params= f"{params} macs='{request_body['macs'] if 'macs' in request_body else 'KeyError'}', " |
|
params= f"{params} remove_ipv4='{request_body['remove_ipv4'] if 'remove_ipv4' in request_body else 'KeyError'}', " |
|
params= f"{params} add_ipv4='{request_body['add_ipv4'] if 'add_ipv4' in request_body else 'KeyError'}', " |
|
current_app.logger.error(f"current_app.config['SPOKE_MODEL'].net_set_dhcp({params}) failed: {error_message}") |
|
return jsonify(dict(assignment_status="assigned", status="error", error_message=error_message)) |
|
|
|
return jsonify(dict(assignment_status="assigned", status="success")) |