Hi
I'm been trying to launch a dataflow job with flex templates in python sdk. The job starts and then fails with the error ModuleNotFoundError: No module named 'src'.
I'll provide some context:
File treeFile tree
DockerfileDockerfile
setup.pysetup.py
requirements.txtrequirements.txt
metadata.jsonmetadata.json
e_commerce_batch.py
Then, in cloud shell I run the following:
What am I missing? I don't want to move the src.processors code to the main python file (e_commerce_batch.py) because that would make that file less readable.
--
Best regards
David Regalado
Web | Linkedin | Cloudskillsboost
Solved! Go to Solution.
EUREKA!
I've solved the issue by adding save_main_session=True in my python code!
See Pickling and Managing the Main Session for more info.
--
Best regards
David Regalado
Web | Linkedin | Cloudskillsboost
UPDATE: I forgot to add the FLEX_TEMPLATE_PYTHON_SETUP_FILE when launching. Now I get a new error 😅
The error message sales_target_parse is not defined
indicates that the Dataflow runner cannot find the sales_target_parse
function. This is because the src/processors/functions.py
module is not imported in the e_commerce_batch.py
file.
To fix this error, you need to import the src/processors/functions.py
module in the e_commerce_batch.py
file. You can do this by adding the following line to the top of the e_commerce_batch.py
file:
from src.processors.functions import sales_target_parse
src/processors/functions.py
module, you will be able to access the sales_target_parse
function in the e_commerce_batch.py
file.Here is an example of how to import the src/processors/functions.py
module in the e_commerce_batch.py
file:
from src.processors.functions import sales_target_parse
def main():
# ...
sales_target_parse(data)
if __name__ == '__main__':
main()
Once you have made this change, you should be able to run your Dataflow job without errors.
Hello and thanks for the fast reply. The original post shows that I'm already importing that function. Nevertheless, I did another run with
It is possible that there is something missing in your Dockerfile that is preventing Python from finding the sales_target_parse
function.
Here are a few things to check:
Include the src
Directory: Make sure that the src
directory is included in the COPY
statement in your Dockerfile.
Function Definition: Ensure that the sales_target_parse
function is defined in a Python file that is included in the COPY
statement. Also, make sure that the sales_target_parse
function is not defined under an if __name__ == "__main__":
block or that it's not being hidden by any conditional statements, which would prevent it from being imported.
Exporting the Function: In Python, functions are accessible by default unless they are hidden by conditional statements. Ensure that the function is accessible and not restricted by any conditions.
Here is an example of a Dockerfile that you can use to build a Python image that includes the sales_target_parse
function:
FROM python:3.10-slim
COPY src /src
WORKDIR /src
RUN pip install -r requirements.txt
CMD ["python", "my_script.py"]
Note: The CMD
directive specifies the default command to run when the container starts. If your intention is to run a Dataflow job, you might need a different command.
If you are still having problems, you can try adding the following line to your Dockerfile:
ENV PYTHONPATH=/src:$PYTHONPATH
This ensures that when Python tries to import a module, it also looks in the /src
directory, which is where your custom modules are located.
Once you have updated your Dockerfile, you can rebuild your image and run it again.
This is my current Dockerfile:
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base ARG WORKDIR=/template RUN mkdir -p ${WORKDIR} RUN mkdir -p ${WORKDIR}/src RUN mkdir -p ${WORKDIR}/src/processors WORKDIR ${WORKDIR} ARG PYTHON_PY_FILE=e_commerce_batch.py COPY src /src COPY . . ENV PYTHONPATH ${WORKDIR} ENV PYTHONPATH=/src:$PYTHONPATH ENV PYTHONPATH=/src/processors:$PYTHONPATH ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/${PYTHON_PY_FILE}" ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt" ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py" # We could get rid of installing libffi-dev and git, or we could leave them. RUN apt-get update \ # Upgrade pip and install the requirements. && pip install --upgrade pip \ && pip install --no-cache-dir --upgrade pip \ && pip download --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE # Since we already downloaded all the dependencies, there's no need to rebuild everything. ENV PIP_NO_DEPS=True ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]
ENV PYTHONPATH=/src:$PYTHONPATH
I'm glad to hear that you've made some progress. It sounds like you're almost there!
The error message sales_target_parse is not defined
still indicates that the Dataflow runner cannot find the sales_target_parse
function. This could be due to a few reasons:
src/processors/functions.py
module might not be in the PYTHONPATH
environment variable.sales_target_parse
function is being imported or used.To troubleshoot this issue:
Check the PYTHONPATH: Ensure that the PYTHONPATH
in your Dockerfile or the environment configuration of your Dataflow job includes the src/processors/
directory.
Exporting Functions: If you're using wildcard imports (from module import *
), ensure that the src/processors/functions.py
file has the following line:
__all__ = ['sales_target_parse']
This ensures that the sales_target_parse
function is available for wildcard imports. However, if you're importing the function directly, this step might not be necessary.
Once you've checked these points, try running your Dataflow job again.
Additional Tips:
Same error 😞
--
Best regards
David Regalado
Web | Linkedin | Cloudskillsboost
EUREKA!
I've solved the issue by adding save_main_session=True in my python code!
See Pickling and Managing the Main Session for more info.
--
Best regards
David Regalado
Web | Linkedin | Cloudskillsboost
That's fantastic news! I'm glad you found the solution.
The save_main_session
option is indeed crucial when using Apache Beam (and by extension, Dataflow) with Python. It ensures that the main session's global context is pickled and made available to the workers, allowing them to access global variables, modules, and functions defined in the main module. Without this, certain dependencies and modules might not be available to the workers, leading to the kind of errors you were seeing.
Thank you for sharing the solution and the reference link. It will be beneficial for others who might encounter a similar issue
hey @davidregalado25 can you specify me where to add the save_main_session=True in the python code
its not working for the abovee file
Hi @uday_varma
Did you solved your issue? Your pipeline should look a little bit like this:
--
Best regards
David Regalado
Web | Linkedin | Cloudskillsboost