You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
186 lines
6.5 KiB
186 lines
6.5 KiB
|
|
import sys |
|
import json |
|
import itertools |
|
import time |
|
import threading |
|
|
|
import aiohttp |
|
import asyncio |
|
from flask import current_app |
|
from capsulflask.shared import OnlineHost, my_exec_info_message |
|
from typing import List |
|
|
|
class HTTPResult: |
|
def __init__(self, status_code, body=None): |
|
self.status_code = status_code |
|
self.body = body |
|
|
|
class InterThreadResult: |
|
def __init__(self, http_result, error=None): |
|
self.http_result = http_result |
|
self.error = error |
|
|
|
class MyHTTPClient: |
|
def __init__(self, timeout_seconds = 5): |
|
self.timeout_seconds = timeout_seconds |
|
self.client_session = None |
|
|
|
|
|
def do_multi_http_sync(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[HTTPResult]: |
|
future = run_coroutine(self.do_multi_http(online_hosts=online_hosts, url_suffix=url_suffix, body=body, authorization_header=authorization_header)) |
|
fromOtherThread = future.result() |
|
toReturn = [] |
|
for individualResult in fromOtherThread: |
|
if individualResult.error != None and individualResult.error != "": |
|
current_app.logger.error(individualResult.error) |
|
toReturn.append(individualResult.http_result) |
|
|
|
return toReturn |
|
|
|
def do_http_sync(self, url: str, body: str, method="POST", authorization_header=None) -> HTTPResult: |
|
future = run_coroutine(self.do_http(method=method, url=url, body=body, authorization_header=authorization_header)) |
|
fromOtherThread = future.result() |
|
if fromOtherThread.error != None and fromOtherThread.error != "": |
|
current_app.logger.error(fromOtherThread.error) |
|
return fromOtherThread.http_result |
|
|
|
def get_client_session(self): |
|
if not self.client_session: |
|
self.client_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout_seconds)) |
|
|
|
return self.client_session |
|
|
|
async def do_http(self, url: str, body: str, method="POST", authorization_header=None) -> InterThreadResult: |
|
# TODO make a configuration option where this throws an error if the url does not start with https:// |
|
response = None |
|
try: |
|
headers = {} |
|
if authorization_header != None: |
|
headers['Authorization'] = authorization_header |
|
if body: |
|
headers['Content-Type'] = "application/json" |
|
response = await self.get_client_session().request( |
|
method=method, |
|
url=url, |
|
json=body, |
|
headers=headers, |
|
verify_ssl=True, |
|
# these are supported in a later version but should not be needed... ? |
|
# https://github.com/aio-libs/aiohttp/issues/2304 |
|
# https://github.com/jorop/aiohttp/commit/d882ed3169eb3e223902f4e85b67bbdfc25169da |
|
# max_line_size=65536, |
|
# max_headers=65536, |
|
# max_field_size=65536, |
|
) |
|
except: |
|
error_message = my_exec_info_message(sys.exc_info()) |
|
response_body = json.dumps({"error_message": f"do_http (HTTP {method} {url}) {error_message}"}) |
|
|
|
return InterThreadResult( |
|
HTTPResult(-1, response_body), |
|
f"""do_http (HTTP {method} {url}) failed with: {error_message}""" |
|
) |
|
|
|
response_body = None |
|
try: |
|
response_body = await response.text() |
|
except: |
|
error_message = my_exec_info_message(sys.exc_info()) |
|
response_body = json.dumps({"error_message": f"error reading response: HTTP {method} {url} (status {response.status}) failed with: {error_message}"}) |
|
return InterThreadResult( |
|
HTTPResult(response.status, response_body), |
|
f"""error reading response: HTTP {method} {url} (status {response.status}) failed with: {error_message}""" |
|
) |
|
|
|
return InterThreadResult(HTTPResult(response.status, response_body), None) |
|
|
|
async def do_multi_http(self, online_hosts: List[OnlineHost], url_suffix: str, body: str, authorization_header=None) -> List[InterThreadResult]: |
|
tasks = [] |
|
# append to tasks in the same order as online_hosts |
|
for host in online_hosts: |
|
tasks.append( |
|
self.do_http(url=f"{host.url}{url_suffix}", body=body, authorization_header=authorization_header) |
|
) |
|
# gather is like Promise.all from javascript, it returns a future which resolves to an array of results |
|
# in the same order as the tasks that we passed in -- which were in the same order as online_hosts |
|
results = await asyncio.gather(*tasks) |
|
|
|
return results |
|
|
|
|
|
# i lifted this direct from https://stackoverflow.com/a/58616001 |
|
# this is the bridge between Flask's one-thread-per-request world |
|
# and aiohttp's event-loop based world -- it allows us to call run_coroutine from a flask request handler |
|
|
|
class EventLoopThread(threading.Thread): |
|
loop = None |
|
_count = itertools.count(0) |
|
|
|
def __init__(self): |
|
name = f"{type(self).__name__}-{next(self._count)}" |
|
super().__init__(name=name, daemon=True) |
|
|
|
def __repr__(self): |
|
loop, r, c, d = self.loop, False, True, False |
|
if loop is not None: |
|
r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug() |
|
return ( |
|
f"<{type(self).__name__} {self.name} id={self.ident} " |
|
f"running={r} closed={c} debug={d}>" |
|
) |
|
|
|
def run(self): |
|
self.loop = loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
|
|
try: |
|
loop.run_forever() |
|
finally: |
|
try: |
|
shutdown_asyncgens = loop.shutdown_asyncgens() |
|
except AttributeError: |
|
pass |
|
else: |
|
loop.run_until_complete(shutdown_asyncgens) |
|
loop.close() |
|
asyncio.set_event_loop(None) |
|
|
|
def stop(self): |
|
loop, self.loop = self.loop, None |
|
if loop is None: |
|
return |
|
loop.call_soon_threadsafe(loop.stop) |
|
self.join() |
|
|
|
_lock = threading.Lock() |
|
_loop_thread = None |
|
|
|
def get_event_loop(): |
|
global _loop_thread |
|
|
|
if _loop_thread is None: |
|
with _lock: |
|
if _loop_thread is None: |
|
_loop_thread = EventLoopThread() |
|
_loop_thread.start() |
|
# give the thread up to a second to produce a loop |
|
deadline = time.time() + 1 |
|
while not _loop_thread.loop and time.time() < deadline: |
|
time.sleep(0.001) |
|
|
|
return _loop_thread.loop |
|
|
|
def stop_event_loop(): |
|
global _loop_thread |
|
with _lock: |
|
if _loop_thread is not None: |
|
_loop_thread.stop() |
|
_loop_thread = None |
|
|
|
def run_coroutine(coro): |
|
"""Run the coroutine in the event loop running in a separate thread |
|
Returns a Future, call Future.result() to get the output |
|
""" |
|
|
|
return asyncio.run_coroutine_threadsafe(coro, get_event_loop()) |