Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Dataflow: ModuleNotFoundError: No module named 'src'

Hi

 

I'm been trying to launch a dataflow job with flex templates in python sdk. The job starts and then fails with the error ModuleNotFoundError: No module named 'src'.

I'll provide some context:

File treeFile treeFile tree

DockerfileDockerfileDockerfile

 setup.pysetup.pysetup.py

 requirements.txtrequirements.txtrequirements.txt

 

metadata.jsonmetadata.jsonmetadata.json

 e_commerce_batch.pye_commerce_batch.py

e_commerce_batch.py

Then, in cloud shell I run the following:

 

gcloud dataflow flex-template build gs://${bucket}/e_commerce_batch.json \
--image-gcr-path "${region}-docker.pkg.dev/${proyecto}/${artifact_registry_name}/dataflow/e_commerce_batch:latest" \
--sdk-language "PYTHON" \
--flex-template-base-image "PYTHON3" \
--metadata-file "metadata.json" \
--py-path "." \
--py-path "src/" \
--py-path "src/processors/" \
--env "FLEX_TEMPLATE_PYTHON_PY_FILE=e_commerce_batch.py" \
--env "FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt" \
 

 What am I missing? I don't want to move the src.processors code to the main python file (e_commerce_batch.py) because that would make that file less readable.

--
Best regards
David Regalado
Web | Linkedin | Cloudskillsboost



 

Solved Solved
1 11 14.6K
1 ACCEPTED SOLUTION

EUREKA!

I've solved the issue by adding save_main_session=True in my python code!

See Pickling and Managing the Main Session for more info.

--
Best regards
David Regalado
Web | Linkedin | Cloudskillsboost

View solution in original post

11 REPLIES 11

UPDATE: I forgot to add the FLEX_TEMPLATE_PYTHON_SETUP_FILE when launching. Now I get a new error 😅

gcloud dataflow flex-template build gs://${bucket}/e_commerce_batch.json \
--image-gcr-path "${region}-docker.pkg.dev/${proyecto}/${artifact_registry_name}/dataflow/e_commerce_batch:latest" \
--sdk-language "PYTHON" \
--flex-template-base-image "PYTHON3" \
--metadata-file "metadata.json" \
--py-path "." \
--py-path "src/" \
--py-path "src/processors/" \
--env "FLEX_TEMPLATE_PYTHON_PY_FILE=e_commerce_batch.py" \
--env "FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt" 
--env "FLEX_TEMPLATE_PYTHON_SETUP_FILE=setup.py"
 
The new error
 
Screen Shot 2023-10-04 at 17.21.19.png

 

sales_target_parse is a function declared in src/processors/functions.py. It's like the main code is not importing the functions in that path.
 
This is the line 65 in the main file (e_commerce_batch.py):
 
e_commerce_batch.pye_commerce_batch.py
 
This is sales_target_parse's code in src/processors/functions.py
src/processors/functions.pysrc/processors/functions.py

 

--
Best regards
David Regalado
Web | Linkedin | Cloudskillsboost
 

The error message sales_target_parse is not defined indicates that the Dataflow runner cannot find the sales_target_parse function. This is because the src/processors/functions.py module is not imported in the e_commerce_batch.py file.

To fix this error, you need to import the src/processors/functions.py module in the e_commerce_batch.py file. You can do this by adding the following line to the top of the e_commerce_batch.py file:

 
from src.processors.functions import sales_target_parse

Here is an example of how to import the src/processors/functions.py module in the e_commerce_batch.py file:

from src.processors.functions import sales_target_parse

def main():
  # ...

  sales_target_parse(data)

if __name__ == '__main__':
  main()

Once you have made this change, you should be able to run your Dataflow job without errors.

Hello and thanks for the fast reply. The original post shows that I'm already importing that function. Nevertheless, I did another run with 

from src.processors.functions import sales_target_parse but I got the same error. Could it be something in the Dockerfile I'm missing?
 
--
Best regards
David Regalado
Web | Linkedin | Cloudskillsboost

 

It is possible that there is something missing in your Dockerfile that is preventing Python from finding the sales_target_parse function.

Here are a few things to check:

  1. Include the src Directory: Make sure that the src directory is included in the COPY statement in your Dockerfile.

  2. Function Definition: Ensure that the sales_target_parse function is defined in a Python file that is included in the COPY statement. Also, make sure that the sales_target_parse function is not defined under an if __name__ == "__main__": block or that it's not being hidden by any conditional statements, which would prevent it from being imported.

  3. Exporting the Function: In Python, functions are accessible by default unless they are hidden by conditional statements. Ensure that the function is accessible and not restricted by any conditions.

Here is an example of a Dockerfile that you can use to build a Python image that includes the sales_target_parse function:

 
FROM python:3.10-slim

COPY src /src

WORKDIR /src

RUN pip install -r requirements.txt

CMD ["python", "my_script.py"]

Note: The CMD directive specifies the default command to run when the container starts. If your intention is to run a Dataflow job, you might need a different command.

If you are still having problems, you can try adding the following line to your Dockerfile:

 

ENV PYTHONPATH=/src:$PYTHONPATH

This is my current Dockerfile:

FROM gcr.io/dataflow-templates-base/python3-template-launcher-base

ARG WORKDIR=/template
RUN mkdir -p ${WORKDIR}
RUN mkdir -p ${WORKDIR}/src
RUN mkdir -p ${WORKDIR}/src/processors
WORKDIR ${WORKDIR}

ARG PYTHON_PY_FILE=e_commerce_batch.py

COPY src /src
COPY . .

ENV PYTHONPATH ${WORKDIR}
ENV PYTHONPATH=/src:$PYTHONPATH
ENV PYTHONPATH=/src/processors:$PYTHONPATH

ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/${PYTHON_PY_FILE}"
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"

# We could get rid of installing libffi-dev and git, or we could leave them.
RUN apt-get update \
# Upgrade pip and install the requirements.
&& pip install --upgrade pip \
&& pip install --no-cache-dir --upgrade pip \
&& pip download --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE

# Since we already downloaded all the dependencies, there's no need to rebuild everything.
ENV PIP_NO_DEPS=True

ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]
 
I added the
ENV PYTHONPATH=/src:$PYTHONPATH
 
And in the python code, I added the following:
 
from src.processors.functions import debug, list_of_orders_parse, sales_target_parse, order_details_parse, valid_rows, valid_columns, valid_columns, merge_datasets, merge_datasets2
 
Same error.
--
Best regards
David Regalado
Web | Linkedin | Cloudskillsboost

I'm glad to hear that you've made some progress. It sounds like you're almost there!

The error message sales_target_parse is not defined still indicates that the Dataflow runner cannot find the sales_target_parse function. This could be due to a few reasons:

  • The src/processors/functions.py module might not be in the PYTHONPATH environment variable.
  • There could be an issue with how the sales_target_parse function is being imported or used.

To troubleshoot this issue:

  1. Check the PYTHONPATH: Ensure that the PYTHONPATH in your Dockerfile or the environment configuration of your Dataflow job includes the src/processors/ directory.

  2. Exporting Functions: If you're using wildcard imports (from module import *), ensure that the src/processors/functions.py file has the following line:

     
    __all__ = ['sales_target_parse']

    This ensures that the sales_target_parse function is available for wildcard imports. However, if you're importing the function directly, this step might not be necessary.

Once you've checked these points, try running your Dataflow job again.

Additional Tips:

  • Consider running the Dataflow job with a single worker for debugging purposes. This can help identify if specific workers are having trouble.
  • Review the Dataflow job logs for any other errors or warnings that might provide more insights.

Same error 😞

--
Best regards
David Regalado
Web | Linkedin | Cloudskillsboost

EUREKA!

I've solved the issue by adding save_main_session=True in my python code!

See Pickling and Managing the Main Session for more info.

--
Best regards
David Regalado
Web | Linkedin | Cloudskillsboost

That's fantastic news! I'm glad you found the solution.

The save_main_session option is indeed crucial when using Apache Beam (and by extension, Dataflow) with Python. It ensures that the main session's global context is pickled and made available to the workers, allowing them to access global variables, modules, and functions defined in the main module. Without this, certain dependencies and modules might not be available to the workers, leading to the kind of errors you were seeing.

Thank you for sharing the solution and the reference link. It will be beneficial for others who might encounter a similar issue

hey @davidregalado25 can you specify me where to add the save_main_session=True  in the python code 

# main.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions
from apache_beam.io.gcp.pubsub import ReadFromPubSub, WriteToPubSub
from transform_messages import TransformMessages
import datetime

def run(argv=None😞
pipeline_options = PipelineOptions(argv)
pipeline_options.view_as(StandardOptions).streaming = True

# Add the service account email using GoogleCloudOptions
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.service_account_email = '<PII removed by staff>'

# Set the job name
job_name = f"dataflow-job-{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}"
google_cloud_options.job_name = job_name

# Set the save_main_session option directly on the PipelineOptions object
pipeline_options.save_main_session = True

with beam.Pipeline(options=pipeline_options) as p:
messages = (p
| 'ReadFromPubSub' >> ReadFromPubSub(topic='projects/dev-payment-analytics/topics/mysql.dataflow.PAYMENT')
| 'TransformMessages' >> beam.ParDo(TransformMessages())
| 'WriteToPubSub' >> WriteToPubSub(topic='projects/dev-payment-analytics/topics/dataflow_bigquery_push')
)

if __name__ == '__main__':
run()

its not working for the abovee file

 

Hi @uday_varma 

Did you solved your issue? Your pipeline should look a little bit like this:

Screenshot 2024-07-27 at 20.37.58.png

--
Best regards
David Regalado
Web | Linkedin | Cloudskillsboost