Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Cloud Composer Redis password (after 2.6.0)

Hello,

After migration to Cloud Composer 2.6.0, we can see a password authentication has been added to the underlying Redis instance.

What I can't find in the documentation is how to access the Redis instance now, i.e. where can I find the password, or even better how can I access the instance from an Airflow task, using the Redis connector.

Is there any way to know the Redis password, or is this documented at all in GCP online docs?

In case you are wondering, for full clarity, we used to have an Airflow maintenance task to clean up Redis queue, and it's of course not working after a password was added in Composer 2.6.0. The actual Python code of the Airflow task I'm trying to run is this:

import logging
from datetime import datetime

from airflow.models import DAG, TaskInstance
from airflow.operators.python import PythonOperator
from airflow.settings import Session
from airflow.utils.state import State
from redis import Redis

logger = logging.getLogger(__name__)


def clear_redis_cache():
    db_session = Session()
    tasks: list = (
        db_session.query(TaskInstance.external_executor_id)
        .filter(TaskInstance.state.in_([State.SUCCESS, State.FAILED]))
        .all()
    )

    def _remove_redis_keys():
        set_removes = set_redis_keys.intersection(set_postgres_tasks)
        if not set_removes:
            return 0

        logger.info("Removing %s keys.", len(set_removes))
        r.delete(*set_removes)
        set_redis_existing_keys.update(set_redis_keys.difference(set_removes))
        set_redis_keys.clear()
        set_postgres_tasks.difference_update(set_removes)

        return len(set_removes)

    celery_prefix = "celery-task-meta"
    set_postgres_tasks = {f"{celery_prefix}-{task_[0]}" for task_ in tasks if task_[0]}

    logger.info("Found %s not running tasks in Postgres", len(set_postgres_tasks))

    tasks.clear()
    total_removed = 0
    set_redis_keys = set()
    set_redis_existing_keys = set()

    r = Redis(host="airflow-redis-service.composer-system.svc.cluster.local")
    for r_key in r.scan_iter(match=f"{celery_prefix}-*"):
        if isinstance(r_key, bytes):
            r_key = r_key.decode()

        set_redis_keys.add(r_key)
        if len(set_redis_keys) >= 1000:
            total_removed += _remove_redis_keys()

    if set_redis_keys:
        total_removed += _remove_redis_keys()

    logger.info("Removed %s keys from Redis", total_removed)
    logger.info("%s keys exist in Redis.", len(set_redis_existing_keys))
    return


with DAG(
    "airflow_redis_cleanup",
    description="Process for clear Redis cache",
    schedule="*/15 * * * *",
    start_date=datetime(2022, 1, 1),
    catchup=False,
    max_active_runs=1,
) as dag:
    op_clear_redis = PythonOperator(
        task_id="clear_redis_cache",
        python_callable=clear_redis_cache,
        provide_context=True,
        priority_weight=100,
    )

 

Solved Solved
0 3 697
1 ACCEPTED SOLUTION

The integration of password authentication for Redis instances in Cloud Composer 2.6.0 marks a notable security upgrade. It's essential to adjust your Airflow tasks to align with this change and maintain both functionality and security.

Strategies for Integrating Redis Authentication in Airflow Tasks:

Approach 1: Retrieve the Redis Password

  • Finding the Password: Directly accessing the Redis password via the Google Cloud Console or gcloud commands might not be possible due to security protocols. Refer to the Cloud Composer documentation and contact Google Cloud support for the most up-to-date and secure method to obtain the Redis credentials.
  • Environment Variables: Cloud Composer might expose the Redis password within environment variables in your Airflow environment. Explore this possibility as a convenient means of managing credentials.
  • Update Your Airflow Task: Once you have acquired the Redis password, incorporate it into your task's authentication process:
 
from redis import Redis 
r = Redis(host="airflow-redis-service.composer-system.svc.cluster.local", 
          password='your_redis_password' # Replace with the actual password ) 

Approach 2: Internal Network Authentication (If Applicable)

  • Implicit Authentication: Cloud Composer might handle Redis authentication automatically for tasks running within its environment. In this case, modifying your tasks to include the password might be unnecessary.
  • Verification Needed: Consult documentation or Google Cloud support to confirm if your Cloud Composer setup manages Redis connections internally for Airflow tasks.

Important Considerations:

  • Stay Updated: Monitor Cloud Composer release notes for the latest information on Redis authentication and other relevant changes.
  • Security Practices: Prioritize explicit authentication with Redis whenever possible. This aligns with security best practices and safeguards your task against potential future changes in network configurations.

Revised Code Example (Assuming explicit authentication):

 
import logging 
from datetime import datetime 
from airflow.models import DAG, TaskInstance 
from airflow.operators.python import PythonOperator 
from airflow.settings import Session 
from airflow.utils.state import State 
from redis import Redis 

def clear_redis_cache(): 
    # Your existing logic to clear the Redis cache

    # Updated Redis connection with authentication 
    r = Redis(host="airflow-redis-service.composer-system.svc.cluster.local", 
              password='your_redis_password' # Use the retrieved password ) 

    # Rest of your code 

# DAG definition remains the same 

Successfully adapting to the Redis authentication in Cloud Composer 2.6.0 requires determining the appropriate way to obtain the Redis password and updating your Airflow tasks. Thoroughly consulting the documentation and leveraging Google Cloud support will streamline this process.

View solution in original post

3 REPLIES 3

The integration of password authentication for Redis instances in Cloud Composer 2.6.0 marks a notable security upgrade. It's essential to adjust your Airflow tasks to align with this change and maintain both functionality and security.

Strategies for Integrating Redis Authentication in Airflow Tasks:

Approach 1: Retrieve the Redis Password

  • Finding the Password: Directly accessing the Redis password via the Google Cloud Console or gcloud commands might not be possible due to security protocols. Refer to the Cloud Composer documentation and contact Google Cloud support for the most up-to-date and secure method to obtain the Redis credentials.
  • Environment Variables: Cloud Composer might expose the Redis password within environment variables in your Airflow environment. Explore this possibility as a convenient means of managing credentials.
  • Update Your Airflow Task: Once you have acquired the Redis password, incorporate it into your task's authentication process:
 
from redis import Redis 
r = Redis(host="airflow-redis-service.composer-system.svc.cluster.local", 
          password='your_redis_password' # Replace with the actual password ) 

Approach 2: Internal Network Authentication (If Applicable)

  • Implicit Authentication: Cloud Composer might handle Redis authentication automatically for tasks running within its environment. In this case, modifying your tasks to include the password might be unnecessary.
  • Verification Needed: Consult documentation or Google Cloud support to confirm if your Cloud Composer setup manages Redis connections internally for Airflow tasks.

Important Considerations:

  • Stay Updated: Monitor Cloud Composer release notes for the latest information on Redis authentication and other relevant changes.
  • Security Practices: Prioritize explicit authentication with Redis whenever possible. This aligns with security best practices and safeguards your task against potential future changes in network configurations.

Revised Code Example (Assuming explicit authentication):

 
import logging 
from datetime import datetime 
from airflow.models import DAG, TaskInstance 
from airflow.operators.python import PythonOperator 
from airflow.settings import Session 
from airflow.utils.state import State 
from redis import Redis 

def clear_redis_cache(): 
    # Your existing logic to clear the Redis cache

    # Updated Redis connection with authentication 
    r = Redis(host="airflow-redis-service.composer-system.svc.cluster.local", 
              password='your_redis_password' # Use the retrieved password ) 

    # Rest of your code 

# DAG definition remains the same 

Successfully adapting to the Redis authentication in Cloud Composer 2.6.0 requires determining the appropriate way to obtain the Redis password and updating your Airflow tasks. Thoroughly consulting the documentation and leveraging Google Cloud support will streamline this process.

Did you find a solution to your problem? the accepted solution does not provide any help..

Hi, you can grab the whole Redis connection URL from env variables and use it for Redis connection.

redis_conn_url = os.environ.get('AIRFLOW__CELERY__BROKER_URL')
redis_client = redis.Redis.from_url(redis_conn_url)