Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Getting only history_id in history().list in whan using Gmail API

I am trying to build a Gmail automation tool where if I send a mail to a user if a user messages back in the same thread the message will automatically pass to the GPT and send the reply in the same thread, but when I am trying to access the history to retrieve new messages it is not detecting new message send by user, it is just printing history_id, not the message sent by the user. P.S I use pub/sub for getting push notifications if there's a new message

 

 

 

 

import os
import base64
from email.mime.text import MIMEText
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
from pydantic import BaseModel
from google.auth.transport.requests import Request as GoogleRequest
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from openai import OpenAI
import json 

# Initialize FastAPI app
app = FastAPI()

# Initialize OpenAI client
client = OpenAI(api_key='api_key')

# Scopes for accessing Gmail API
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly', 
          'https://www.googleapis.com/auth/gmail.send',
          'https://www.googleapis.com/auth/gmail.modify',
          'https://www.googleapis.com/auth/gmail.compose']

# Gmail authentication function
def authenticate_gmail():
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(GoogleRequest())
        else:
            flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
            creds = flow.run_local_server(port=8080)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    return build('gmail', 'v1', credentials=creds)

# Function to generate email content using GPT-3.5
def generate_gpt_content(prompt):
    try:
        response = client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[
                {"role": "system"   , "content": "You are an AI assistant that generates professional email content."},
                {"role": "user", "content": prompt}
            ])
        return response.choices[0].message.content
    except Exception as e:
        print(f"An error occurred with GPT-3.5: {e}")
        return None

# Function to create a MIME email message
def create_message(to, subject, message_text):
    message = MIMEText(message_text)
    message['to'] = to
    message['subject'] = subject
    raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
    return {'raw': raw}

# Function to send an email message using the Gmail API
def send_message(service, user_id, message):
    try:
        message = service.users().messages().send(userId=user_id, body=message).execute()
        print('Message sent to recipient')
        return message
    except HttpError as error:
        print('An error occurred: %s' % error)
        raise HTTPException(status_code=500, detail="Failed to send message")

# Function to create a reply email message
def create_reply_message(to, subject, message_text, thread_id, message_id):
    message = MIMEText(message_text)
    message['to'] = to
    message['subject'] = f"Re: {subject}"
    message['In-Reply-To'] = message_id
    message['References'] = message_id
    raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
    return {'raw': raw, 'threadId': thread_id}

# Function to set up monitoring for a specific thread using Pub/Sub
def setup_thread_watch(service, thread_id):
    request = {
        'labelIds': ['INBOX'],      
        'topicName': 'projects/gen-lang-client-0493531113/topics/email-notifications',
        'labelFilterAction': 'include',
        'threadId': thread_id  # Monitor only the specific thread
    }
    response = service.users().watch(userId='me', body=request).execute()
    print(f'Push notification setup response: {response}')
    return response

# Endpoint to send the initial email and start monitoring the thread
@app.post("/send-initial-email/")
def send_initial_email(recipient_email: str, subject: str, prompt: str):
    service = authenticate_gmail()
    body = generate_gpt_content(prompt)
    if body:
        message = create_message(recipient_email, subject, body)
        sent_message = send_message(service, 'me', message)
        thread_id = sent_message['threadId']
        setup_thread_watch(service, thread_id)
        return {"message": "Email sent successfully", "thread_id": thread_id}
    else:
        raise HTTPException(status_code=500, detail="Failed to generate email content")

# Webhook endpoint to handle incoming notifications and respond to the client in the same thread
@app.post("/webhook/")
async def webhook(request: Request, background_tasks: BackgroundTasks):
    payload = await request.json()
    service = authenticate_gmail()
    
    # Decode the base64 data
    data = payload.get('message', {}).get('data')
    if data:
        decoded_data = base64.b64decode(data).decode('utf-8')
        print(f"Decoded data: {decoded_data}")
        # Convert the decoded data to a dictionary if it's in JSON format
        data_dict = json.loads(decoded_data)
        print(f"Decoded data as dict: {data_dict}")
        history_id = data_dict.get('historyId')
    else:
        history_id = None
    
    if history_id:
        history = service.users().history().list(userId='me', startHistoryId=history_id).execute()
        print(f"History response: {history}")  # Log history response   to console
        print(  history.get('history',[]))
        for record in history.get('history', []):
            for message in record.get('messagesAdded', []):
                message_id = message['message']['id']
                msg = service.users().messages().get(userId='me', id=message_id).execute()
                thread_id = msg['threadId']
                
                # Get the entire thread
                thread = service.users().threads().get(userId='me', id=thread_id).execute()
                print(f"Thread details: {thread}")
                
                # Process the latest message in the thread
                latest_message = thread['messages'][-1]
                reply_prompt = f"Compose a professional reply to this email: {latest_message['snippet']}"
                reply_text = generate_gpt_content(reply_prompt)
                if reply_text:
                    client_email = next(header['value'] for header in latest_message['payload']['headers'] if header['name'] == 'From')
                    reply_msg = create_reply_message(client_email, "Re: Important update", reply_text, thread_id, latest_message['id'])
                    background_tasks.add_task(send_message, service, 'me', reply_msg)
        return {"message": "Reply processed"}
    return {"message": "No relevant updates"}   

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

 

 

 

 

 

0 0 52
0 REPLIES 0
Top Labels in this Space