Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

The issue with pythons's threading on Google Function

Hi!

I have an issue with python threading when I run the code as Google Function. It looks like "wait()" for "threading.Event" doesn't work. In addition, I observe a lack of synchronization between threads. In my little sample code, thread-safe python's queue isn't synchronized.
The bug isn't reproducible locally on the same python version (3.11).

Sample code:

Show More
import functools
import logging
import queue
import threading
from typing import Optional

from flask import Flask, Request, request


logger: logging.Logger = logging.getLogger('thread_test')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s [%(levelname)s] "
                              "%(filename)s:%(lineno)s %(funcName)s(): "
                              "'%(message)s'")
handler.setFormatter(formatter)
logger.addHandler(handler)


class Scheduler:
    class ScheduleThread(threading.Thread):
        def __init__(self, interval: float = 10.0):
            super().__init__()
            self.name += " (Scheduler)"
            self._interval = interval
            self.job_queue = queue.Queue()
            self.event_job = threading.Event()
            self.event_stop = threading.Event()

        def run(self):
            while not self.event_stop.is_set():
                if self.event_job.wait(self._interval):
                    self.event_job.clear()
                logger.debug(id(self.job_queue))
                logger.debug(self.job_queue)
                logger.debug(self.job_queue.__dict__)
                while not self.job_queue.empty():
                    logger.debug("")
                    job = self.job_queue.get(False)
                    logger.debug(job)
                    if isinstance(job, functools.partial):
                        logger.debug("")
                        job()
                        logger.debug("")

    def add_job(self, job_func, *args, **kwargs):
        logger.debug("")
        job = functools.partial(job_func, *args, **kwargs)
        logger.debug(id(self._thread.job_queue))
        logger.debug(self._thread.job_queue)
        logger.debug(self._thread.job_queue.__dict__)
        self._thread.job_queue.put(job)
        logger.debug(self._thread.job_queue.__dict__)
        self._thread.event_job.set()
        logger.debug("")

    def __init__(self):
        self._thread = self.ScheduleThread()
        self._thread.start()

    def __del__(self):
        self._thread.event_stop.set()


flask_app = Flask("Thread Test")
scheduler: Optional[Scheduler] = None


with flask_app.app_context():
    scheduler = Scheduler()


def test_func():
    logger.debug("Thread: %s", threading.current_thread().name)


# Cloud Functions

def app_test(req: Request):
    """HTTP Cloud Function.
    Args:
        req (flask.Request): The request object.
        <https://flask.palletsprojects.com/en/1.1.x/api/#incoming-request-data>
    Returns:
        The response text, or any set of values that can be turned into a
        Response object using `make_response`
        <https://flask.palletsprojects.com/en/1.1.x/api/#flask.make_response>.
    """
    logger.debug("")
    if isinstance(scheduler, Scheduler):
        logger.debug("")
        scheduler.add_job(test_func)
    logger.debug("")
    return ""


@flask_app.route("/function", methods=["GET", "POST"])
def handle_anything():
    logger.debug("")
    return app_test(request)


if __name__ == "__main__":
    flask_app.run(host="0.0.0.0", port=3000)

Log (Google Function):

Show More
2023-07-02 17:43:29,951 [DEBUG] main.py:34 run(): '68299283007952'
2023-07-02 17:43:29,951 [DEBUG] main.py:35 run(): '<queue.Queue object at 0x3e1e2a8089d0>'
2023-07-02 17:43:29,951 [DEBUG] main.py:36 run(): '{'maxsize': 0, 'queue': deque([]), 'mutex': <unlocked _thread.lock object at 0x3e1e2a808b40>, 'not_empty': <Condition(<unlocked _thread.lock object at 0x3e1e2a808b40>, 0)>, 'not_full': <Condition(<unlocked _thread.lock object at 0x3e1e2a808b40>, 0)>, 'all_tasks_done': <Condition(<unlocked _thread.lock object at 0x3e1e2a808b40>, 0)>, 'unfinished_tasks': 0}'

0.005671040s,HTTP/1.1,88.112.49.7,GET,946,https://europe-north1-officebot-389014.cloudfunctions.net/app_test,649,216.239.36.54,200,"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
2023-07-02 17:43:32,805 [DEBUG] main.py:89 app_test(): ''
2023-07-02 17:43:32,805 [DEBUG] main.py:91 app_test(): ''
2023-07-02 17:43:32,805 [DEBUG] main.py:47 add_job(): ''
2023-07-02 17:43:32,805 [DEBUG] main.py:49 add_job(): '68299283007952'
2023-07-02 17:43:32,805 [DEBUG] main.py:50 add_job(): '<queue.Queue object at 0x3e1e2a8089d0>'
2023-07-02 17:43:32,805 [DEBUG] main.py:51 add_job(): '{'maxsize': 0, 'queue': deque([]), 'mutex': <unlocked _thread.lock object at 0x3e1e2a808b40>, 'not_empty': <Condition(<unlocked _thread.lock object at 0x3e1e2a808b40>, 0)>, 'not_full': <Condition(<unlocked _thread.lock object at 0x3e1e2a808b40>, 0)>, 'all_tasks_done': <Condition(<unlocked _thread.lock object at 0x3e1e2a808b40>, 0)>, 'unfinished_tasks': 0}'
2023-07-02 17:43:32,806 [DEBUG] main.py:53 add_job(): '{'maxsize': 0, 'queue': deque([functools.partial(<function test_func at 0x3e1e2a7f5da0>)]), 'mutex': <unlocked _thread.lock object at 0x3e1e2a808b40>, 'not_empty': <Condition(<unlocked _thread.lock object at 0x3e1e2a808b40>, 0)>, 'not_full': <Condition(<unlocked _thread.lock object at 0x3e1e2a808b40>, 0)>, 'all_tasks_done': <Condition(<unlocked _thread.lock object at 0x3e1e2a808b40>, 0)>, 'unfinished_tasks': 1}'
2023-07-02 17:43:32,806 [DEBUG] main.py:55 add_job(): ''
2023-07-02 17:43:32,806 [DEBUG] main.py:93 app_test(): ''

2023-07-02 17:43:39,952 [DEBUG] main.py:34 run(): '68299283007952'
2023-07-02 17:43:39,952 [DEBUG] main.py:35 run(): '<queue.Queue object at 0x3e1e2a8089d0>'
2023-07-02 17:43:39,952 [DEBUG] main.py:36 run(): '{'maxsize': 0, 'queue': deque([]), 'mutex': <unlocked _thread.lock object at 0x3e1e2a808b40>, 'not_empty': <Condition(<unlocked _thread.lock object at 0x3e1e2a808b40>, 0)>, 'not_full': <Condition(<unlocked _thread.lock object at 0x3e1e2a808b40>, 0)>, 'all_tasks_done': <Condition(<unlocked _thread.lock object at 0x3e1e2a808b40>, 0)>, 'unfinished_tasks': 0}'​

Log (local):

Show More
2023-07-02 20:41:01,127 [DEBUG] main.py:34 run(): '4394904080'
2023-07-02 20:41:01,128 [DEBUG] main.py:35 run(): '<queue.Queue object at 0x105f4ea10>'
2023-07-02 20:41:01,129 [DEBUG] main.py:36 run(): '{'maxsize': 0, 'queue': deque([]), 'mutex': <unlocked _thread.lock object at 0x1060b2e00>, 'not_empty': <Condition(<unlocked _thread.lock object at 0x1060b2e00>, 0)>, 'not_full': <Condition(<unlocked _thread.lock object at 0x1060b2e00>, 0)>, 'all_tasks_done': <Condition(<unlocked _thread.lock object at 0x1060b2e00>, 0)>, 'unfinished_tasks': 0}'

2023-07-02 20:41:03,559 [DEBUG] main.py:99 handle_anything(): ''
2023-07-02 20:41:03,560 [DEBUG] main.py:89 app_test(): ''
2023-07-02 20:41:03,560 [DEBUG] main.py:91 app_test(): ''
2023-07-02 20:41:03,560 [DEBUG] main.py:47 add_job(): ''
2023-07-02 20:41:03,560 [DEBUG] main.py:49 add_job(): '4394904080'
2023-07-02 20:41:03,560 [DEBUG] main.py:50 add_job(): '<queue.Queue object at 0x105f4ea10>'
2023-07-02 20:41:03,560 [DEBUG] main.py:51 add_job(): '{'maxsize': 0, 'queue': deque([]), 'mutex': <unlocked _thread.lock object at 0x1060b2e00>, 'not_empty': <Condition(<unlocked _thread.lock object at 0x1060b2e00>, 0)>, 'not_full': <Condition(<unlocked _thread.lock object at 0x1060b2e00>, 0)>, 'all_tasks_done': <Condition(<unlocked _thread.lock object at 0x1060b2e00>, 0)>, 'unfinished_tasks': 0}'
2023-07-02 20:41:03,561 [DEBUG] main.py:53 add_job(): '{'maxsize': 0, 'queue': deque([functools.partial(<function test_func at 0x1060a72e0>)]), 'mutex': <unlocked _thread.lock object at 0x1060b2e00>, 'not_empty': <Condition(<unlocked _thread.lock object at 0x1060b2e00>, 0)>, 'not_full': <Condition(<unlocked _thread.lock object at 0x1060b2e00>, 0)>, 'all_tasks_done': <Condition(<unlocked _thread.lock object at 0x1060b2e00>, 0)>, 'unfinished_tasks': 1}'
2023-07-02 20:41:03,561 [DEBUG] main.py:55 add_job(): ''
2023-07-02 20:41:03,561 [DEBUG] main.py:93 app_test(): ''
127.0.0.1 - - [02/Jul/2023 20:41:03] "GET /function HTTP/1.1" 200 -

2023-07-02 20:41:03,566 [DEBUG] main.py:34 run(): '4394904080'
2023-07-02 20:41:03,566 [DEBUG] main.py:35 run(): '<queue.Queue object at 0x105f4ea10>'
2023-07-02 20:41:03,566 [DEBUG] main.py:36 run(): '{'maxsize': 0, 'queue': deque([functools.partial(<function test_func at 0x1060a72e0>)]), 'mutex': <unlocked _thread.lock object at 0x1060b2e00>, 'not_empty': <Condition(<unlocked _thread.lock object at 0x1060b2e00>, 0)>, 'not_full': <Condition(<unlocked _thread.lock object at 0x1060b2e00>, 0)>, 'all_tasks_done': <Condition(<unlocked _thread.lock object at 0x1060b2e00>, 0)>, 'unfinished_tasks': 1}'
2023-07-02 20:41:03,566 [DEBUG] main.py:38 run(): ''
2023-07-02 20:41:03,566 [DEBUG] main.py:40 run(): 'functools.partial(<function test_func at 0x1060a72e0>)'
2023-07-02 20:41:03,566 [DEBUG] main.py:42 run(): ''
2023-07-02 20:41:03,566 [DEBUG] main.py:74 test_func(): 'Thread: Thread-1 (Scheduler)'
2023-07-02 20:41:03,567 [DEBUG] main.py:44 run(): ''​

 

Solved Solved
0 3 2,278
1 ACCEPTED SOLUTION

I found a workaround to solve this problem. This issue only affects threads created from the main thread. If you create a thread inside the server thread pool (e.g. at the first network request), it works as expected. So, lazy thread initialization is enough.
Code with workaround:

Show More
import functools
import logging
import queue
import threading
from typing import Callable, Optional

from flask import Flask, Request, Response, request


logger: logging.Logger = logging.getLogger('thread_test')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s [%(levelname)s] "
                              "%(filename)s:%(lineno)s %(funcName)s(): "
                              "'%(message)s'")
handler.setFormatter(formatter)
logger.addHandler(handler)


class Scheduler:
    class ScheduleThread(threading.Thread):
        def __init__(self, interval: float = 10.0) -> None:
            super().__init__()
            self.name += " (Scheduler)"
            self._interval = interval
            self.job_queue = queue.Queue()
            self.event_job = threading.Event()
            self.event_stop = threading.Event()

        def run(self) -> None:
            while not self.event_stop.is_set():
                if self.event_job.wait(self._interval):
                    self.event_job.clear()
                logger.debug(id(self.job_queue))
                logger.debug(self.job_queue.__dict__)
                while not self.job_queue.empty():
                    logger.debug("")
                    job = self.job_queue.get(False)
                    logger.debug(job)
                    if isinstance(job, functools.partial):
                        logger.debug("")
                        job()
                        logger.debug("")

    def start(self) -> None:
        logger.debug("")
        self._thread = self.ScheduleThread()
        self._thread.start()

    def add_job(self, job_func: Callable, *args, **kwargs) -> None:
        if self._thread is None:
            self.start()

        if self._thread is None:
            return

        logger.debug("")
        job = functools.partial(job_func, *args, **kwargs)
        logger.debug(id(self._thread.job_queue))
        logger.debug(self._thread.job_queue.__dict__)
        self._thread.job_queue.put(job)
        logger.debug(self._thread.job_queue.__dict__)
        self._thread.event_job.set()
        logger.debug("")

    def __init__(self) -> None:
        self._thread: Optional[Scheduler.ScheduleThread] = None

    def __del__(self) -> None:
        if self._thread is not None:
            self._thread.event_stop.set()


flask_app = Flask("Thread Test")
scheduler: Optional[Scheduler] = None


with flask_app.app_context():
    scheduler = Scheduler()


def test_func() -> None:
    logger.debug("Thread: %s", threading.current_thread().name)


# Cloud Functions

def app_test(_req: Request) -> Response:
    """HTTP Cloud Function.
    Args:
        req (flask.Request): The request object.
        <https://flask.palletsprojects.com/en/1.1.x/api/#incoming-request-data>
    Returns:
        The response text, or any set of values that can be turned into a
        Response object using `make_response`
        <https://flask.palletsprojects.com/en/1.1.x/api/#flask.make_response>.
    """
    logger.debug("")
    if isinstance(scheduler, Scheduler):
        logger.debug("")
        scheduler.add_job(test_func)
    logger.debug("")

    return Response()


@flask_app.route("/function", methods=["GET", "POST"])
def handle_anything() -> Response:
    logger.debug("")
    return app_test(request)


if __name__ == "__main__":
    flask_app.run(host="0.0.0.0", port=3000)

View solution in original post

3 REPLIES 3

Hi @evgeniy_raketti,

Welcome to Google Cloud Community!

Perhaps you can try to replace queue.Queue with queue.SimpleQueue , you'll be using a thread-safe queue implementation, see if it resolves the issue you were facing with threading and synchronization.

It's also important to note that google Cloud Functions is primarily designed for short-lived, stateless functions and may not provide the necessary support for complex threaded applications. Cloud Functions runs your code in a serverless environment where multiple function instances can run simultaneously. The underlying infrastructure may handle incoming requests concurrently, leading to potential interference and lack of synchronization between threads. As much as possible try to avoid relying heavily on multi-threading in GCF. 

You may look into other compute options provided by Google Cloud Platform, such as Google Cloud Run or Google Kubernetes Engine (GKE), which offer more flexibility and control over the execution environment and may better support multi-threaded applications.

Hope this helps.

Unfortunately, the replacement of queue.Queue with queue.SimpleQueue doesn't change the described behaviour. So, it looks like you shouldn't touch threading at all if you use Google Functions. Thanks!

I found a workaround to solve this problem. This issue only affects threads created from the main thread. If you create a thread inside the server thread pool (e.g. at the first network request), it works as expected. So, lazy thread initialization is enough.
Code with workaround:

Show More
import functools
import logging
import queue
import threading
from typing import Callable, Optional

from flask import Flask, Request, Response, request


logger: logging.Logger = logging.getLogger('thread_test')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s [%(levelname)s] "
                              "%(filename)s:%(lineno)s %(funcName)s(): "
                              "'%(message)s'")
handler.setFormatter(formatter)
logger.addHandler(handler)


class Scheduler:
    class ScheduleThread(threading.Thread):
        def __init__(self, interval: float = 10.0) -> None:
            super().__init__()
            self.name += " (Scheduler)"
            self._interval = interval
            self.job_queue = queue.Queue()
            self.event_job = threading.Event()
            self.event_stop = threading.Event()

        def run(self) -> None:
            while not self.event_stop.is_set():
                if self.event_job.wait(self._interval):
                    self.event_job.clear()
                logger.debug(id(self.job_queue))
                logger.debug(self.job_queue.__dict__)
                while not self.job_queue.empty():
                    logger.debug("")
                    job = self.job_queue.get(False)
                    logger.debug(job)
                    if isinstance(job, functools.partial):
                        logger.debug("")
                        job()
                        logger.debug("")

    def start(self) -> None:
        logger.debug("")
        self._thread = self.ScheduleThread()
        self._thread.start()

    def add_job(self, job_func: Callable, *args, **kwargs) -> None:
        if self._thread is None:
            self.start()

        if self._thread is None:
            return

        logger.debug("")
        job = functools.partial(job_func, *args, **kwargs)
        logger.debug(id(self._thread.job_queue))
        logger.debug(self._thread.job_queue.__dict__)
        self._thread.job_queue.put(job)
        logger.debug(self._thread.job_queue.__dict__)
        self._thread.event_job.set()
        logger.debug("")

    def __init__(self) -> None:
        self._thread: Optional[Scheduler.ScheduleThread] = None

    def __del__(self) -> None:
        if self._thread is not None:
            self._thread.event_stop.set()


flask_app = Flask("Thread Test")
scheduler: Optional[Scheduler] = None


with flask_app.app_context():
    scheduler = Scheduler()


def test_func() -> None:
    logger.debug("Thread: %s", threading.current_thread().name)


# Cloud Functions

def app_test(_req: Request) -> Response:
    """HTTP Cloud Function.
    Args:
        req (flask.Request): The request object.
        <https://flask.palletsprojects.com/en/1.1.x/api/#incoming-request-data>
    Returns:
        The response text, or any set of values that can be turned into a
        Response object using `make_response`
        <https://flask.palletsprojects.com/en/1.1.x/api/#flask.make_response>.
    """
    logger.debug("")
    if isinstance(scheduler, Scheduler):
        logger.debug("")
        scheduler.add_job(test_func)
    logger.debug("")

    return Response()


@flask_app.route("/function", methods=["GET", "POST"])
def handle_anything() -> Response:
    logger.debug("")
    return app_test(request)


if __name__ == "__main__":
    flask_app.run(host="0.0.0.0", port=3000)