I am trying to build a Gmail automation tool where if I send a mail to a user if a user messages back in the same thread the message will automatically pass to the GPT and send the reply in the same thread, but when I am trying to access the history to retrieve new messages it is not detecting new message send by user, it is just printing history_id, not the message sent by the user. P.S I use pub/sub for getting push notifications if there's a new message
import os
import base64
from email.mime.text import MIMEText
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
from pydantic import BaseModel
from google.auth.transport.requests import Request as GoogleRequest
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from openai import OpenAI
import json
# Initialize FastAPI app
app = FastAPI()
# Initialize OpenAI client
client = OpenAI(api_key='api_key')
# Scopes for accessing Gmail API
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/gmail.send',
'https://www.googleapis.com/auth/gmail.modify',
'https://www.googleapis.com/auth/gmail.compose']
# Gmail authentication function
def authenticate_gmail():
creds = None
if os.path.exists('token.json'):
creds = Credentials.from_authorized_user_file('token.json', SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(GoogleRequest())
else:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=8080)
with open('token.json', 'w') as token:
token.write(creds.to_json())
return build('gmail', 'v1', credentials=creds)
# Function to generate email content using GPT-3.5
def generate_gpt_content(prompt):
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system" , "content": "You are an AI assistant that generates professional email content."},
{"role": "user", "content": prompt}
])
return response.choices[0].message.content
except Exception as e:
print(f"An error occurred with GPT-3.5: {e}")
return None
# Function to create a MIME email message
def create_message(to, subject, message_text):
message = MIMEText(message_text)
message['to'] = to
message['subject'] = subject
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
return {'raw': raw}
# Function to send an email message using the Gmail API
def send_message(service, user_id, message):
try:
message = service.users().messages().send(userId=user_id, body=message).execute()
print('Message sent to recipient')
return message
except HttpError as error:
print('An error occurred: %s' % error)
raise HTTPException(status_code=500, detail="Failed to send message")
# Function to create a reply email message
def create_reply_message(to, subject, message_text, thread_id, message_id):
message = MIMEText(message_text)
message['to'] = to
message['subject'] = f"Re: {subject}"
message['In-Reply-To'] = message_id
message['References'] = message_id
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
return {'raw': raw, 'threadId': thread_id}
# Function to set up monitoring for a specific thread using Pub/Sub
def setup_thread_watch(service, thread_id):
request = {
'labelIds': ['INBOX'],
'topicName': 'projects/gen-lang-client-0493531113/topics/email-notifications',
'labelFilterAction': 'include',
'threadId': thread_id # Monitor only the specific thread
}
response = service.users().watch(userId='me', body=request).execute()
print(f'Push notification setup response: {response}')
return response
# Endpoint to send the initial email and start monitoring the thread
@app.post("/send-initial-email/")
def send_initial_email(recipient_email: str, subject: str, prompt: str):
service = authenticate_gmail()
body = generate_gpt_content(prompt)
if body:
message = create_message(recipient_email, subject, body)
sent_message = send_message(service, 'me', message)
thread_id = sent_message['threadId']
setup_thread_watch(service, thread_id)
return {"message": "Email sent successfully", "thread_id": thread_id}
else:
raise HTTPException(status_code=500, detail="Failed to generate email content")
# Webhook endpoint to handle incoming notifications and respond to the client in the same thread
@app.post("/webhook/")
async def webhook(request: Request, background_tasks: BackgroundTasks):
payload = await request.json()
service = authenticate_gmail()
# Decode the base64 data
data = payload.get('message', {}).get('data')
if data:
decoded_data = base64.b64decode(data).decode('utf-8')
print(f"Decoded data: {decoded_data}")
# Convert the decoded data to a dictionary if it's in JSON format
data_dict = json.loads(decoded_data)
print(f"Decoded data as dict: {data_dict}")
history_id = data_dict.get('historyId')
else:
history_id = None
if history_id:
history = service.users().history().list(userId='me', startHistoryId=history_id).execute()
print(f"History response: {history}") # Log history response to console
print( history.get('history',[]))
for record in history.get('history', []):
for message in record.get('messagesAdded', []):
message_id = message['message']['id']
msg = service.users().messages().get(userId='me', id=message_id).execute()
thread_id = msg['threadId']
# Get the entire thread
thread = service.users().threads().get(userId='me', id=thread_id).execute()
print(f"Thread details: {thread}")
# Process the latest message in the thread
latest_message = thread['messages'][-1]
reply_prompt = f"Compose a professional reply to this email: {latest_message['snippet']}"
reply_text = generate_gpt_content(reply_prompt)
if reply_text:
client_email = next(header['value'] for header in latest_message['payload']['headers'] if header['name'] == 'From')
reply_msg = create_reply_message(client_email, "Re: Important update", reply_text, thread_id, latest_message['id'])
background_tasks.add_task(send_message, service, 'me', reply_msg)
return {"message": "Reply processed"}
return {"message": "No relevant updates"}
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)