Hello,
After migration to Cloud Composer 2.6.0, we can see a password authentication has been added to the underlying Redis instance.
What I can't find in the documentation is how to access the Redis instance now, i.e. where can I find the password, or even better how can I access the instance from an Airflow task, using the Redis connector.
Is there any way to know the Redis password, or is this documented at all in GCP online docs?
In case you are wondering, for full clarity, we used to have an Airflow maintenance task to clean up Redis queue, and it's of course not working after a password was added in Composer 2.6.0. The actual Python code of the Airflow task I'm trying to run is this:
import logging
from datetime import datetime
from airflow.models import DAG, TaskInstance
from airflow.operators.python import PythonOperator
from airflow.settings import Session
from airflow.utils.state import State
from redis import Redis
logger = logging.getLogger(__name__)
def clear_redis_cache():
db_session = Session()
tasks: list = (
db_session.query(TaskInstance.external_executor_id)
.filter(TaskInstance.state.in_([State.SUCCESS, State.FAILED]))
.all()
)
def _remove_redis_keys():
set_removes = set_redis_keys.intersection(set_postgres_tasks)
if not set_removes:
return 0
logger.info("Removing %s keys.", len(set_removes))
r.delete(*set_removes)
set_redis_existing_keys.update(set_redis_keys.difference(set_removes))
set_redis_keys.clear()
set_postgres_tasks.difference_update(set_removes)
return len(set_removes)
celery_prefix = "celery-task-meta"
set_postgres_tasks = {f"{celery_prefix}-{task_[0]}" for task_ in tasks if task_[0]}
logger.info("Found %s not running tasks in Postgres", len(set_postgres_tasks))
tasks.clear()
total_removed = 0
set_redis_keys = set()
set_redis_existing_keys = set()
r = Redis(host="airflow-redis-service.composer-system.svc.cluster.local")
for r_key in r.scan_iter(match=f"{celery_prefix}-*"):
if isinstance(r_key, bytes):
r_key = r_key.decode()
set_redis_keys.add(r_key)
if len(set_redis_keys) >= 1000:
total_removed += _remove_redis_keys()
if set_redis_keys:
total_removed += _remove_redis_keys()
logger.info("Removed %s keys from Redis", total_removed)
logger.info("%s keys exist in Redis.", len(set_redis_existing_keys))
return
with DAG(
"airflow_redis_cleanup",
description="Process for clear Redis cache",
schedule="*/15 * * * *",
start_date=datetime(2022, 1, 1),
catchup=False,
max_active_runs=1,
) as dag:
op_clear_redis = PythonOperator(
task_id="clear_redis_cache",
python_callable=clear_redis_cache,
provide_context=True,
priority_weight=100,
)
Solved! Go to Solution.
The integration of password authentication for Redis instances in Cloud Composer 2.6.0 marks a notable security upgrade. It's essential to adjust your Airflow tasks to align with this change and maintain both functionality and security.
Strategies for Integrating Redis Authentication in Airflow Tasks:
Approach 1: Retrieve the Redis Password
from redis import Redis
r = Redis(host="airflow-redis-service.composer-system.svc.cluster.local",
password='your_redis_password' # Replace with the actual password )
Approach 2: Internal Network Authentication (If Applicable)
Important Considerations:
Revised Code Example (Assuming explicit authentication):
import logging
from datetime import datetime
from airflow.models import DAG, TaskInstance
from airflow.operators.python import PythonOperator
from airflow.settings import Session
from airflow.utils.state import State
from redis import Redis
def clear_redis_cache():
# Your existing logic to clear the Redis cache
# Updated Redis connection with authentication
r = Redis(host="airflow-redis-service.composer-system.svc.cluster.local",
password='your_redis_password' # Use the retrieved password )
# Rest of your code
# DAG definition remains the same
Successfully adapting to the Redis authentication in Cloud Composer 2.6.0 requires determining the appropriate way to obtain the Redis password and updating your Airflow tasks. Thoroughly consulting the documentation and leveraging Google Cloud support will streamline this process.
The integration of password authentication for Redis instances in Cloud Composer 2.6.0 marks a notable security upgrade. It's essential to adjust your Airflow tasks to align with this change and maintain both functionality and security.
Strategies for Integrating Redis Authentication in Airflow Tasks:
Approach 1: Retrieve the Redis Password
from redis import Redis
r = Redis(host="airflow-redis-service.composer-system.svc.cluster.local",
password='your_redis_password' # Replace with the actual password )
Approach 2: Internal Network Authentication (If Applicable)
Important Considerations:
Revised Code Example (Assuming explicit authentication):
import logging
from datetime import datetime
from airflow.models import DAG, TaskInstance
from airflow.operators.python import PythonOperator
from airflow.settings import Session
from airflow.utils.state import State
from redis import Redis
def clear_redis_cache():
# Your existing logic to clear the Redis cache
# Updated Redis connection with authentication
r = Redis(host="airflow-redis-service.composer-system.svc.cluster.local",
password='your_redis_password' # Use the retrieved password )
# Rest of your code
# DAG definition remains the same
Successfully adapting to the Redis authentication in Cloud Composer 2.6.0 requires determining the appropriate way to obtain the Redis password and updating your Airflow tasks. Thoroughly consulting the documentation and leveraging Google Cloud support will streamline this process.
Did you find a solution to your problem? the accepted solution does not provide any help..
Hi, you can grab the whole Redis connection URL from env variables and use it for Redis connection.
redis_conn_url = os.environ.get('AIRFLOW__CELERY__BROKER_URL')
redis_client = redis.Redis.from_url(redis_conn_url)