The API that I'm trying to connect is Iconik API which is a REST API and has a limit of 50 requests per second sustained or 1000 requests over any 20 second period. I set up an asynchronous static method called _perform_api_request to connect to the API:
@staticmethod
async def _perform_api_request(request_data):
# Ensure `uri` is a string
uri = request_data["uri"]
if isinstance(uri, list):
uri = uri[0]
params = request_data["params"]
headers = request_data.get("headers", None)
try:
async with aiohttp.ClientSession() as session:
async with session.get(uri, params=params, headers=headers) as response:
print(f"Making request to {uri} with params {params}")
if response.status != 200:
raise Exception(
f"API request {uri} failed with status {response.status}: "
f"{await response.text()}"
)
response_data = await response.json()
# Prepare a dictionary with relevant data for serialization
serializable_data = {
"status": response.status,
"body": response_data,
"retry_after": response_data.get("retry_after", 1),
}
print(serializable_data["status"])
return serializable_data
except asyncio.TimeoutError:
print(f"Timeout when connecting to {uri}")
raise
except aiohttp.ClientError as e:
print(f"Client error: {e}")
raise
I set up a DAG that will call this method as I intend to run in in Cloud Composer:
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import timedelta
from utils.api_load import API_Load
import asyncio
default_args = {
"owner": "owner",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 5,
"retry_delay": timedelta(minutes=5),
}
async def execute_api_request(request_data_list, config):
pagination = config["source_spec"].get("src_pagination", False)
limiter = AsyncLimiter(40)
if len(request_data_list) <= 1:
request_data = request_data_list[0]
has_next_page = True
response_data_list = []
async with limiter:
while has_next_page:
response = await API_Load._perform_api_request(request_data)
response_data_list.append(response)
if pagination:
token_value = None
if "body" in response:
for token_key in ["nextPageToken", "scroll_id"]:
token_value = response["body"].get(token_key)
if token_value:
request_data["params"][token_key] = token_value
break
# Stop if no pagination token is found
has_next_page = bool(token_value)
else:
# Non-paginated APIs: stop after first response
has_next_page = False
return response_data_list
@dag(
dag_id="bigapple_data_pipeline",
default_args=default_args,
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
description="DAG to execute the API request",
catchup=False,
)
def data_pipeline_dag():
@task()
def execute_api_request(request_data_list):
return asyncio.run(execute_api_request(request_data_list))
execute_api_request(request_data_list)
data_pipeline_dag()
However, I keep getting this error when running the DAG in Cloud Composer. I can get to 64 requests before getting ConnectionTimeoutError:
ERROR - Task failed with exception\nTraceback (most recent call last):\n File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 1109, in _wrap_create_connection\n sock = await aiohappyeyeballs.start_connection(\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/aiohappyeyeballs/impl.py", line 82, in start_connection\n sock = await _connect_sock(\n
^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/aiohappyeyeballs/impl.py", line 174, in _connect_sock\n await loop.sock_connect(sock, address)\n File "/opt/python3.11/lib/python3.11/asyncio/selector_events.py", line 638, in sock_connect\n return await fut\n
^^^^^^^^^\nasyncio.exceptions.CancelledError\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/client.py", line 663, in _request\n conn = await self._connector.connect(\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 538, in connect\n proto = await self._create_connection(req, traces, timeout)\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 1050, in _create_connection\n _, proto = await self._create_direct_connection(req, traces, timeout)\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 1384, in _create_direct_connection\n raise last_exc\n File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 1353, in _create_direct_connection\n transp, proto = await self._wrap_create_connection(\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 1106, in _wrap_create_connection\n async with ceil_timeout(\n File "/opt/python3.11/lib/python3.11/asyncio/timeouts.py", line 115, in aexit\n raise TimeoutError from exc_val\nTimeoutError\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 763, in _execute_task\n result = _execute_callable(context=context, **execute_callable_kwargs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 734, in _execute_callable\n return ExecutionCallableRunner(\n ^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run\n return self.func(*args, **kwargs)\n
^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 415, in wrapper\n return func(self, *args, **kwargs)\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/decorators/base.py", line 266, in execute\n return_value = super().execute(context)\n
^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 415, in wrapper\n return func(self, *args, **kwargs)\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 238, in execute\n return_value = self.execute_callable()\n
^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 256, in execute_callable\n return runner.run(*self.op_args, **self.op_kwargs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run\n return self.func(*args, **kwargs)\n
^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/home/airflow/gcs/dags/data_pipeline.py", line 299, in perform_api_request\n return asyncio.run(processor.perform_api_request(request_data_list, config))\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/asyncio/runners.py", line 190, in run\n return runner.run(main)\n ^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/asyncio/runners.py", line 118, in run\n return self._loop.run_until_complete(task)\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/asyncio/base_events.py", line 654, in run_until_complete\n return future.result()\n
^^^^^^^^^^^^^^^\n File "/home/airflow/gcs/dags/data_pipeline.py", line 188, in execute_api_request\n response = await API_Load._perform_api_request(request_data)\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/home/airflow/gcs/dags/utils/api_load.py", line 108, in _perform_api_request\n async with session.get(uri, params=params, headers=headers) as response:\n File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/client.py", line 1360, in aenter\n self._resp: _RetType = await self._coro\n
^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/client.py", line 667, in _request\n raise ConnectionTimeoutError(\naiohttp.client_exceptions.ConnectionTimeoutError: Connection timeout to host https://app.iconik.io/API/assets/v1/assets/
The strange thing is when I import execute_api_request from the DAG file and run it locally, I never get any TimeoutError. Is this an issue with my Cloud Composer set up or something in my code?
Hi @busuu,
Welcome to Google Cloud Community!
The “ConnectionTimeoutError” inside Cloud Composer/Airflow when connecting to Iconik API suggests a possible network, server overload, rate limiting or configuration issue rather than code issue since there is a possible timing difference between running it locally and executing it on Cloud Composer.
Here are several suggestions that may help resolve the issue:
Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.