gemini-resume-checker/app.py

242 lines
8.6 KiB
Python

import os
import re
import asyncio
import secrets
import uuid
from flask import Flask, render_template, request, jsonify
from werkzeug.utils import secure_filename
from flask_wtf.csrf import CSRFProtect
from pdf2image import convert_from_path
from PIL import Image
import fitz # PyMuPDF for PDF validation
from flask_cors import CORS
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import google.generativeai as genai
import requests
from dotenv import load_dotenv
import markdown
# Load environment variables
load_dotenv()
# Configure Gemini API
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "")
if not GOOGLE_API_KEY:
raise ValueError("GOOGLE_API_KEY environment variable is required but not set")
genai.configure(api_key=GOOGLE_API_KEY)
model_img = genai.GenerativeModel('gemini-2.0-flash-lite-preview')
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = os.getenv("UPLOAD_FOLDER", 'uploads/')
app.config['ALLOWED_EXTENSIONS'] = "pdf"
app.config['MAX_FILE_SIZE'] = int(os.getenv("MAX_FILE_SIZE", 4 * 1024 * 1024)) # 4MB limit
app.config['SECRET_KEY'] = os.getenv(os.getenv("SECRET_KEY"), secrets.token_hex(32))
MAX_TEXT_LENGTH = 2000
RECAPTCHA_SECRET_KEY = os.getenv("RECAPTCHA_SECRET_KEY", "")
RECAPTCHA_SITE_KEY = os.getenv("RECAPTCHA_SITE_KEY", "")
semaphore = asyncio.Semaphore(5) # Limit to 5 concurrent tasks
# Initialize Flask-Limiter
limiter = Limiter(
get_remote_address, # Limits requests based on client IP
app=app,
default_limits=[os.getenv("RATE_LIMIT", "1 per 10 seconds")])
# Custom function to reject requests with 444 status code
@app.errorhandler(429)
def rate_limit_exceeded(e):
return "", 444 # Nginx-style "No Response" error
# CSRF Protection
csrf = CSRFProtect(app)
# CORS with strict policy
CORS(app, resources={r"/*": {"origins": os.getenv("CORS_ORIGIN", "")}})
if not os.path.exists(app.config['UPLOAD_FOLDER']):
os.makedirs(app.config['UPLOAD_FOLDER'])
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']
def sanitize_text(text):
sanitized_text = re.sub(r'[^a-zA-Z0-9 .,!?\n\r]', '', text)
return sanitized_text[:MAX_TEXT_LENGTH]
import fitz # PyMuPDF
import fitz # PyMuPDF
def is_valid_pdf(pdf_path):
try:
# Check if the file starts with "%PDF" magic bytes
with open(pdf_path, "rb") as f:
if not f.read(4) == b"%PDF":
print("Error: File is not a valid PDF.")
return False
doc = fitz.open(pdf_path)
for page in doc:
# Check for JavaScript in annotations
for ann in page.annots() or []:
if ann.info.get("JS") or ann.info.get("AA"): # JavaScript actions
print("Error: PDF contains JavaScript.")
return False
# Check for form fields (can contain scripts)
'''
if page.widgets():
print("Error: PDF contains form fields (potentially unsafe).")
return False
'''
# Check for embedded files (can contain malicious content)
for link in page.get_links():
if link.get("kind") == 2: # Allow external links
continue
elif link.get("kind") in [5, 6]: # Embedded file or launch action
print("Error: PDF contains embedded files or launch actions.")
return False
print("✅ PDF is valid and safe (contains no JavaScript or harmful elements).")
return True
except Exception as e:
print(f"Error processing PDF: {e}")
return False
def format_summary(summary):
return markdown.markdown(summary)
def resize_image(image, max_size=1080):
width, height = image.size
if width > max_size or height > max_size:
scaling_factor = max_size / max(width, height)
new_size = (int(width * scaling_factor), int(height * scaling_factor))
return image.resize(new_size, Image.Resampling.LANCZOS)
return image
def stitch_images(image_paths):
"""Stitches multiple images into a single vertical image."""
images = [Image.open(img_path) for img_path in image_paths]
# Get total height & max width
max_width = max(img.width for img in images)
total_height = sum(img.height for img in images)
# Create a blank image with the combined height
stitched_image = Image.new("RGB", (max_width, total_height))
# Paste images sequentially
y_offset = 0
for img in images:
stitched_image.paste(img, (0, y_offset))
y_offset += img.height
# Resize if needed
stitched_image = resize_image(stitched_image, max_size=1080)
return stitched_image
def pdf_to_images(pdf_path, max_pages=2):
"""Converts PDF to images and stitches them into one."""
images = convert_from_path(pdf_path, first_page=1, last_page=max_pages)
image_paths = []
for i, image in enumerate(images):
image_path = f"{pdf_path}_page_{i+1}.png"
image.save(image_path, "PNG")
image_paths.append(image_path)
# Stitch images into one
stitched_image = stitch_images(image_paths)
# Cleanup individual images
for img_path in image_paths:
os.remove(img_path)
return stitched_image
async def summarize_pdf(pdf_file, jobdesc_text):
async with semaphore: # Limit concurrency to 5
jobdesc_text = sanitize_text(jobdesc_text)
jobdesc_text += "\nHow compatible my CV is with this jobdesc? In a scale of 100 how would you score it?"
# Convert PDF to image asynchronously
image = await asyncio.to_thread(pdf_to_images, pdf_file, 2)
# Run AI inference in a separate thread to avoid blocking
summary = await asyncio.to_thread(model_img.generate_content, [jobdesc_text, image])
os.remove(pdf_file) # Delete PDF after processing
formatted_summary = format_summary(summary.text)
return formatted_summary
@app.after_request
def add_security_headers(response):
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
response.headers['Content-Security-Policy'] = (
"default-src 'self'; "
"script-src 'self' https://www.google.com https://www.gstatic.com https://code.jquery.com https://cdn.jsdelivr.net https://www.recaptcha.net; "
"style-src 'self' https://cdn.jsdelivr.net; "
"img-src 'self' data: https://www.google.com https://www.gstatic.com; "
"frame-src 'self' https://www.google.com https://www.recaptcha.net; "
"connect-src 'self' https://www.google.com https://www.gstatic.com https://www.recaptcha.net; "
"object-src 'none'; "
)
return response
@app.route('/', methods=['GET'])
def upload_page():
return render_template('upload.html', recaptcha_site_key=RECAPTCHA_SITE_KEY)
@app.route('/', methods=['POST'])
@limiter.limit("1 per 10 seconds")
async def process_upload():
recaptcha_response = request.form.get('g-recaptcha-response')
recaptcha_verification = requests.post(
"https://www.google.com/recaptcha/api/siteverify",
data={"secret": RECAPTCHA_SECRET_KEY, "response": recaptcha_response}
).json()
if not recaptcha_verification.get("success"):
return jsonify({"error": "reCAPTCHA verification failed."}), 400
text_input = sanitize_text(request.form.get('text_input', '').strip())
file = request.files.get('file')
if not text_input and not file:
return jsonify({"error": "Job description and PDF file are required."}), 400
if file and allowed_file(file.filename):
if file.content_length and file.content_length > app.config['MAX_FILE_SIZE']:
return jsonify({"error": "File size exceeds 4MB limit."}), 400
filename = f"{uuid.uuid4().hex}.pdf"
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
if not is_valid_pdf(filepath):
os.remove(filepath)
return jsonify({"error": "Invalid or potentially harmful PDF."}), 400
summary = await summarize_pdf(filepath, text_input)
score_match = re.search(r"(\d{1,3})\s*(?:\/|out of)\s*100", summary, re.IGNORECASE)
score = int(score_match.group(1)) if score_match else None
return jsonify({"summary": summary, "score": score})
return jsonify({"error": "Invalid file format."}), 400
if __name__ == '__main__':
app.run(host=os.getenv("FLASK_RUN_HOST", "0.0.0.0"), port=int(os.getenv("FLASK_RUN_PORT", 49465)), debug=False)