I'm trying to run an apache beam Dataflow job directly into GCP using Artifact Registry. Basically I'm trying to access an sqlserver instance using ReadFromJdbc, but keep getting the same error message in the Dataflow pipeline below! Unfortunately, there's no other error msg from the workers, only the below:
ERROR:root:Error: Cannot start an expansion service since neither Java nor Docker executables are available in the system.
I have tried many different Dockerfile, but my last one is the below:
FROM apache/beam-python3.9-sdk:2.52.0
# Install necessary packages. Update first.
RUN apt-get update && apt-get install -y --no-install-recommends \
openjdk-11-jdk \
default-jre \
python3-pip \
netcat \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
# Set JAVA_HOME (important for JDBC drivers)
ENV JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64"
# Create a directory for the JDBC driver
RUN mkdir -p /opt/jdbc
# Copy the JDBC driver to the container
COPY mssql-jdbc-12.8.1.jre11.jar /opt/jdbc/
# Install Apache Beam and other dependencies
RUN pip install --no-cache-dir apache-beam[gcp]==2.52.0
# Set working directory
WORKDIR /app
# Copy the Beam pipeline code to the container
COPY main.py .
# Set the entry point for the Dataflow job
ENTRYPOINT ["python", "main.py"]
Then I have the
import apache_beam as beam
import logging
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
import argparse
from typing import List, Dict, Tuple, Union
from apache_beam.io.jdbc import ReadFromJdbc
import logging
import os
def run(argv=None):
"""Runs the Apache Beam pipeline to read a CSV file from GCS and write it to BigQuery with specified schema, adding a timestamp column.
Args:
input_file_path: The GCS path to the CSV file (e.g., gs://your-bucket/path/to/data.csv).
project_id: The ID of the Google Cloud project.
dataset: The ID of the BigQuery dataset.
table: The ID of the BigQuery table.
network: The VPC network to use.
subnetwork: The subnetwork to use.
table_schema: The JSON string containing the BigQuery schema definition.
"""
try:
parser = argparse.ArgumentParser(description="Dataflow Flex Template")
parser.add_argument("--project", required=True, help="GCP project.")
parser.add_argument("--region", required=True, help="GCP region.")
parser.add_argument("--staging_location", required=True, help="GCS staging location.")
parser.add_argument("--temp_location", required=True, help="GCS temp location.")
parser.add_argument("--network", help="VPC Network Name")
parser.add_argument("--subnetwork", help="Subnetwork URL")
parser.add_argument("--service_account_email", help="Service account email")
parser.add_argument("--runner", default="DataflowRunner", help="Dataflow Runner")
# Changed this to parse_known_args
args, pipeline_args = parser.parse_known_args(argv)
# Print argument values for debugging
print(f"Project: {args.project}")
print(f"Region: {args.region}")
print(f"Staging Location: {args.staging_location}")
print(f"Temp Location: {args.temp_location}")
print(f"Network: {args.network}")
print(f"Subnetwork: {args.subnetwork}")
print(f"Service Account Email: {args.service_account_email}")
pipeline_options = PipelineOptions(
network=args.network,
subnetwork=args.subnetwork,
save_main_session=True,
no_use_public_ips=True # Disable public IPs
)
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = args.project
google_cloud_options.region = args.region
google_cloud_options.staging_location = args.staging_location
google_cloud_options.temp_location = args.temp_location
google_cloud_options.service_account_email = args.service_account_email
standard_options = pipeline_options.view_as(StandardOptions)
standard_options.runner = 'DataflowRunner'
db_user = "sqlserver" #Retrieve from environment variable
db_password = "root"#Retrieve from environment variable
jdbc_url = "jdbc:sqlserver://XXXXX:1433;databaseName=XXX;trustServerCertificate=true;"
query = "SELECT TOP 10 * FROM production.brands"
with beam.Pipeline(options=pipeline_options) as pipeline:
# Read the CSV file from GCS and skip the header
data = pipeline | "Read from SQL Server" >> ReadFromJdbc(
table_name='brands',
driver_class_name='com.microsoft.sqlserver.jdbc.SQLServerDriver',
jdbc_url=jdbc_url,
username=db_user,
password=db_password,
query=query,
fetch_size=1,
classpath=['/opt/jdbc/mssql-jdbc-12.8.1.jre11.jar'] # Corrected classpath
)
data | "Print Data" >> beam.Map(print)
except Exception as e:
logging.error(f"Error: {e}")
if __name__ == '__main__':
import logging
logging.getLogger().setLevel(logging.INFO)
run()
I I generate my flex template like the below:
gcloud dataflow flex-template build $TEMPLATE_PATH \
--image-gcr-path "us-east1-docker.pkg.dev/$PROJECT_ID/dataflow-abc-repo/sql-server-to-bq-2:latest" \
--sdk-language "PYTHON" \
--flex-template-base-image "PYTHON3" \
--py-path "." \
--worker-region us-east1 \
--service-account-email gcp-terraform-sa@csf-lab.iam.gserviceaccount.com \
--env "FLEX_TEMPLATE_PYTHON_PY_FILE=main.py" \
--env "FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt"
Would anyone knows what's is wrong with the code above? Or would have a valid code with Docker/Artifact Registry to share? I already lost many days on this and no way of find out!
Appreciate the help! thanks!