Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

ConnectionTimeoutError inside Cloud Compsoser/Airflow when connecting to API

The API that I'm trying to connect is Iconik API which is a REST API and has a limit of 50 requests per second sustained or 1000 requests over any 20 second period. I set up an asynchronous static method called _perform_api_request to connect to the API:

 

@staticmethod
async def _perform_api_request(request_data):
    # Ensure `uri` is a string
    uri = request_data["uri"]

    if isinstance(uri, list):
        uri = uri[0]

    params = request_data["params"]
    headers = request_data.get("headers", None)
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(uri, params=params, headers=headers) as response:
                print(f"Making request to {uri} with params {params}")
                if response.status != 200:
                    raise Exception(
                        f"API request {uri} failed with status {response.status}: "
                        f"{await response.text()}"
                    )

                response_data = await response.json()
                # Prepare a dictionary with relevant data for serialization
                serializable_data = {
                    "status": response.status,
                    "body": response_data,
                    "retry_after": response_data.get("retry_after", 1),
                }
                print(serializable_data["status"])
                return serializable_data
    except asyncio.TimeoutError:
        print(f"Timeout when connecting to {uri}")
        raise
    except aiohttp.ClientError as e:
        print(f"Client error: {e}")
        raise

 


I set up a DAG that will call this method as I intend to run in in Cloud Composer:

 

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import timedelta
from utils.api_load import API_Load
import asyncio

default_args = {
    "owner": "owner",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 5,
    "retry_delay": timedelta(minutes=5),
}
async def execute_api_request(request_data_list, config):
    pagination = config["source_spec"].get("src_pagination", False)
    limiter = AsyncLimiter(40)
    if len(request_data_list) <= 1:
        request_data = request_data_list[0]
        has_next_page = True
        response_data_list = []
        async with limiter:
            while has_next_page:
                response = await API_Load._perform_api_request(request_data)
                response_data_list.append(response)

                if pagination:
                    token_value = None
                    if "body" in response:
                        for token_key in ["nextPageToken", "scroll_id"]:
                            token_value = response["body"].get(token_key)
                            if token_value:
                                request_data["params"][token_key] = token_value
                                break
                    # Stop if no pagination token is found
                    has_next_page = bool(token_value)
                else:
                    # Non-paginated APIs: stop after first response
                    has_next_page = False
        return response_data_list

@dag(
    dag_id="bigapple_data_pipeline",
    default_args=default_args,
    schedule_interval=timedelta(days=1),
    start_date=days_ago(1),
    description="DAG to execute the API request",
    catchup=False,
)

def data_pipeline_dag():

    @task()
    def execute_api_request(request_data_list):
        return asyncio.run(execute_api_request(request_data_list))

    execute_api_request(request_data_list)

data_pipeline_dag()

 

However, I keep getting this error when running the DAG in Cloud Composer. I can get to 64 requests before getting ConnectionTimeoutError:

ERROR - Task failed with exception\nTraceback (most recent call last):\n File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 1109, in _wrap_create_connection\n sock = await aiohappyeyeballs.start_connection(\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/aiohappyeyeballs/impl.py", line 82, in start_connection\n sock = await _connect_sock(\n
^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/aiohappyeyeballs/impl.py", line 174, in _connect_sock\n await loop.sock_connect(sock, address)\n File "/opt/python3.11/lib/python3.11/asyncio/selector_events.py", line 638, in sock_connect\n return await fut\n
^^^^^^^^^\nasyncio.exceptions.CancelledError\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/client.py", line 663, in _request\n conn = await self._connector.connect(\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 538, in connect\n proto = await self._create_connection(req, traces, timeout)\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 1050, in _create_connection\n _, proto = await self._create_direct_connection(req, traces, timeout)\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 1384, in _create_direct_connection\n raise last_exc\n File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 1353, in _create_direct_connection\n transp, proto = await self._wrap_create_connection(\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 1106, in _wrap_create_connection\n async with ceil_timeout(\n File "/opt/python3.11/lib/python3.11/asyncio/timeouts.py", line 115, in aexit\n raise TimeoutError from exc_val\nTimeoutError\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 763, in _execute_task\n result = _execute_callable(context=context, **execute_callable_kwargs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 734, in _execute_callable\n return ExecutionCallableRunner(\n ^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run\n return self.func(*args, **kwargs)\n
^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 415, in wrapper\n return func(self, *args, **kwargs)\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/decorators/base.py", line 266, in execute\n return_value = super().execute(context)\n
^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 415, in wrapper\n return func(self, *args, **kwargs)\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 238, in execute\n return_value = self.execute_callable()\n
^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 256, in execute_callable\n return runner.run(*self.op_args, **self.op_kwargs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run\n return self.func(*args, **kwargs)\n
^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/home/airflow/gcs/dags/data_pipeline.py", line 299, in perform_api_request\n return asyncio.run(processor.perform_api_request(request_data_list, config))\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/asyncio/runners.py", line 190, in run\n return runner.run(main)\n ^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/asyncio/runners.py", line 118, in run\n return self._loop.run_until_complete(task)\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/asyncio/base_events.py", line 654, in run_until_complete\n return future.result()\n
^^^^^^^^^^^^^^^\n File "/home/airflow/gcs/dags/data_pipeline.py", line 188, in execute_api_request\n response = await API_Load._perform_api_request(request_data)\n
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/home/airflow/gcs/dags/utils/api_load.py", line 108, in _perform_api_request\n async with session.get(uri, params=params, headers=headers) as response:\n File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/client.py", line 1360, in aenter\n self._resp: _RetType = await self._coro\n
^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/aiohttp/client.py", line 667, in _request\n raise ConnectionTimeoutError(\naiohttp.client_exceptions.ConnectionTimeoutError: Connection timeout to host https://app.iconik.io/API/assets/v1/assets/

The strange thing is when I import execute_api_request from the DAG file and run it locally, I never get any TimeoutError. Is this an issue with my Cloud Composer set up or something in my code?

 

0 1 243
1 REPLY 1

Hi @busuu,

Welcome to Google Cloud Community!

The “ConnectionTimeoutError” inside Cloud Composer/Airflow when connecting to Iconik API suggests a possible network, server overload, rate limiting or configuration issue rather than code issue since there is a possible timing difference between running it locally and executing it on Cloud Composer.

Here are several suggestions that may help resolve the issue:

  • Rate Limiting: Ensure you're not hitting the rate limit of Iconik API limits (50 req/sec or 1000/20 sec), while you can run it locally, your DAG might be running it concurrently that all call the _perform_api_request method simultaneously at the same time and hitting the rate limit on Iconik API.
  • Connectivity: Ensure there are no issues on your connectivity or network latency, including checking your Firewalls and DNS, ensuring the network configuration in your composer environment is configured properly, assess if there is a high latency between your Cloud Composer environment and the Iconik API since it can contribute to timeout, and check general network problems.
  • Implement Exponential Backoff: Reduce your request rate and increase the wait time between attempts to avoid hitting the rate limit.
  • Configuration: Ensure your Cloud Composer/Airflow configuration is properly configured, also ensure you're not hitting the resource limits on your Airflow worker instances.
  • DAG: Ensure your DAG to simplify API calls to prevent multiple retries which increase the number of requests leading to connection timeout error.

Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.