mirror of
https://github.com/gabrielkheisa/gemini-resume-checker.git
synced 2025-02-23 01:14:12 +07:00
288 lines
11 KiB
Python
288 lines
11 KiB
Python
|
import os
|
||
|
import re
|
||
|
import asyncio
|
||
|
import secrets
|
||
|
import uuid
|
||
|
from flask import Flask, render_template, request, jsonify
|
||
|
from werkzeug.utils import secure_filename
|
||
|
from flask_wtf.csrf import CSRFProtect
|
||
|
from pdf2image import convert_from_path
|
||
|
from PIL import Image
|
||
|
import fitz # PyMuPDF for PDF validation
|
||
|
from flask_cors import CORS
|
||
|
from flask_limiter import Limiter
|
||
|
from flask_limiter.util import get_remote_address
|
||
|
import google.generativeai as genai
|
||
|
import requests
|
||
|
from dotenv import load_dotenv
|
||
|
|
||
|
# Load environment variables
|
||
|
load_dotenv()
|
||
|
|
||
|
# Configure Gemini API
|
||
|
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "")
|
||
|
if not GOOGLE_API_KEY:
|
||
|
raise ValueError("GOOGLE_API_KEY environment variable is required but not set")
|
||
|
genai.configure(api_key=GOOGLE_API_KEY)
|
||
|
model_img = genai.GenerativeModel('gemini-2.0-flash-lite-preview')
|
||
|
|
||
|
app = Flask(__name__)
|
||
|
app.config['UPLOAD_FOLDER'] = os.getenv("UPLOAD_FOLDER", 'uploads/')
|
||
|
app.config['ALLOWED_EXTENSIONS'] = "pdf"
|
||
|
app.config['MAX_FILE_SIZE'] = int(os.getenv("MAX_FILE_SIZE", 4 * 1024 * 1024)) # 4MB limit
|
||
|
app.config['SECRET_KEY'] = os.getenv(os.getenv("SECRET_KEY"), secrets.token_hex(32))
|
||
|
|
||
|
MAX_TEXT_LENGTH = 2000
|
||
|
|
||
|
RECAPTCHA_SECRET_KEY = os.getenv("RECAPTCHA_SECRET_KEY", "")
|
||
|
RECAPTCHA_SITE_KEY = os.getenv("RECAPTCHA_SITE_KEY", "")
|
||
|
|
||
|
semaphore = asyncio.Semaphore(5) # Limit to 5 concurrent tasks
|
||
|
|
||
|
# Initialize Flask-Limiter
|
||
|
limiter = Limiter(
|
||
|
get_remote_address, # Limits requests based on client IP
|
||
|
app=app,
|
||
|
default_limits=[os.getenv("RATE_LIMIT", "1 per 10 seconds")])
|
||
|
|
||
|
|
||
|
# Custom function to reject requests with 444 status code
|
||
|
@app.errorhandler(429)
|
||
|
def rate_limit_exceeded(e):
|
||
|
return "", 444 # Nginx-style "No Response" error
|
||
|
|
||
|
# CSRF Protection
|
||
|
csrf = CSRFProtect(app)
|
||
|
|
||
|
# CORS with strict policy
|
||
|
CORS(app, resources={r"/*": {"origins": os.getenv("CORS_ORIGIN", "")}})
|
||
|
|
||
|
if not os.path.exists(app.config['UPLOAD_FOLDER']):
|
||
|
os.makedirs(app.config['UPLOAD_FOLDER'])
|
||
|
|
||
|
def allowed_file(filename):
|
||
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']
|
||
|
|
||
|
def sanitize_text(text):
|
||
|
sanitized_text = re.sub(r'[^a-zA-Z0-9 .,!?\n\r]', '', text)
|
||
|
return sanitized_text[:MAX_TEXT_LENGTH]
|
||
|
|
||
|
import fitz # PyMuPDF
|
||
|
|
||
|
import fitz # PyMuPDF
|
||
|
|
||
|
def is_valid_pdf(pdf_path):
|
||
|
try:
|
||
|
# Check if the file starts with "%PDF" magic bytes
|
||
|
with open(pdf_path, "rb") as f:
|
||
|
if not f.read(4) == b"%PDF":
|
||
|
print("Error: File is not a valid PDF.")
|
||
|
return False
|
||
|
|
||
|
doc = fitz.open(pdf_path)
|
||
|
|
||
|
for page in doc:
|
||
|
# Check for JavaScript in annotations
|
||
|
for ann in page.annots() or []:
|
||
|
if ann.info.get("JS") or ann.info.get("AA"): # JavaScript actions
|
||
|
print("Error: PDF contains JavaScript.")
|
||
|
return False
|
||
|
|
||
|
# Check for form fields (can contain scripts)
|
||
|
'''
|
||
|
if page.widgets():
|
||
|
print("Error: PDF contains form fields (potentially unsafe).")
|
||
|
return False
|
||
|
'''
|
||
|
|
||
|
# Check for embedded files (can contain malicious content)
|
||
|
for link in page.get_links():
|
||
|
if link.get("kind") == 2: # Allow external links
|
||
|
continue
|
||
|
elif link.get("kind") in [5, 6]: # Embedded file or launch action
|
||
|
print("Error: PDF contains embedded files or launch actions.")
|
||
|
return False
|
||
|
|
||
|
print("✅ PDF is valid and safe (contains no JavaScript or harmful elements).")
|
||
|
return True
|
||
|
|
||
|
except Exception as e:
|
||
|
print(f"Error processing PDF: {e}")
|
||
|
return False
|
||
|
|
||
|
def format_summary(summary):
|
||
|
"""Converts markdown-like text to HTML formatting, including nested lists (both ordered and unordered)."""
|
||
|
summary = re.sub(r'\*\*(.*?)\*\*', r'<strong>\1</strong>', summary) # Bold text
|
||
|
summary = re.sub(r'\*(?!\s)(.*?)\*', r'<em>\1</em>', summary) # Italic text (ignoring lists)
|
||
|
|
||
|
lines = summary.split('\n')
|
||
|
formatted_lines = []
|
||
|
list_stack = [] # Track nesting levels and types
|
||
|
|
||
|
for line in lines:
|
||
|
unordered_match = re.match(r'^(\s*)\*\s(.+)', line) # Matches "* item"
|
||
|
ordered_match = re.match(r'^(\s*)(\d+)\.\s(.+)', line) # Matches "1. item"
|
||
|
|
||
|
if unordered_match or ordered_match:
|
||
|
indent = unordered_match.group(1) if unordered_match else ordered_match.group(1)
|
||
|
level = len(indent) // 4 # Assume 4 spaces per indent level
|
||
|
|
||
|
list_type = '<ul>' if unordered_match else '<ol>'
|
||
|
list_tag = '<li>{}</li>'.format(unordered_match.group(2) if unordered_match else ordered_match.group(3))
|
||
|
|
||
|
# Close lists if necessary
|
||
|
while list_stack and len(list_stack) > level:
|
||
|
formatted_lines.append('</{}>'.format(list_stack.pop()))
|
||
|
|
||
|
# Open new lists if necessary
|
||
|
while len(list_stack) < level:
|
||
|
formatted_lines.append(list_type)
|
||
|
list_stack.append(list_type[1:3]) # Store 'ul' or 'ol'
|
||
|
|
||
|
# Handle list type switching (unordered ↔ ordered)
|
||
|
if list_stack and list_stack[-1] != list_type[1:3]:
|
||
|
formatted_lines.append('</{}>'.format(list_stack.pop()))
|
||
|
formatted_lines.append(list_type)
|
||
|
list_stack.append(list_type[1:3])
|
||
|
|
||
|
formatted_lines.append(list_tag)
|
||
|
|
||
|
else:
|
||
|
# Close any open lists before adding non-list content
|
||
|
while list_stack:
|
||
|
formatted_lines.append('</{}>'.format(list_stack.pop()))
|
||
|
|
||
|
formatted_lines.append(line.replace("\n", "<br>"))
|
||
|
|
||
|
# Close any remaining lists
|
||
|
while list_stack:
|
||
|
formatted_lines.append('</{}>'.format(list_stack.pop()))
|
||
|
|
||
|
return '\n'.join(formatted_lines)
|
||
|
|
||
|
def resize_image(image, max_size=1080):
|
||
|
width, height = image.size
|
||
|
if width > max_size or height > max_size:
|
||
|
scaling_factor = max_size / max(width, height)
|
||
|
new_size = (int(width * scaling_factor), int(height * scaling_factor))
|
||
|
return image.resize(new_size, Image.Resampling.LANCZOS)
|
||
|
return image
|
||
|
|
||
|
def stitch_images(image_paths):
|
||
|
"""Stitches multiple images into a single vertical image."""
|
||
|
images = [Image.open(img_path) for img_path in image_paths]
|
||
|
|
||
|
# Get total height & max width
|
||
|
max_width = max(img.width for img in images)
|
||
|
total_height = sum(img.height for img in images)
|
||
|
|
||
|
# Create a blank image with the combined height
|
||
|
stitched_image = Image.new("RGB", (max_width, total_height))
|
||
|
|
||
|
# Paste images sequentially
|
||
|
y_offset = 0
|
||
|
for img in images:
|
||
|
stitched_image.paste(img, (0, y_offset))
|
||
|
y_offset += img.height
|
||
|
|
||
|
# Resize if needed
|
||
|
stitched_image = resize_image(stitched_image, max_size=1080)
|
||
|
|
||
|
return stitched_image
|
||
|
|
||
|
|
||
|
def pdf_to_images(pdf_path, max_pages=2):
|
||
|
"""Converts PDF to images and stitches them into one."""
|
||
|
images = convert_from_path(pdf_path, first_page=1, last_page=max_pages)
|
||
|
|
||
|
image_paths = []
|
||
|
for i, image in enumerate(images):
|
||
|
image_path = f"{pdf_path}_page_{i+1}.png"
|
||
|
image.save(image_path, "PNG")
|
||
|
image_paths.append(image_path)
|
||
|
|
||
|
# Stitch images into one
|
||
|
stitched_image = stitch_images(image_paths)
|
||
|
|
||
|
# Cleanup individual images
|
||
|
for img_path in image_paths:
|
||
|
os.remove(img_path)
|
||
|
|
||
|
return stitched_image
|
||
|
|
||
|
async def summarize_pdf(pdf_file, jobdesc_text):
|
||
|
async with semaphore: # Limit concurrency to 5
|
||
|
jobdesc_text = sanitize_text(jobdesc_text)
|
||
|
|
||
|
jobdesc_text += "\nHow compatible my CV is with this jobdesc? In a scale of 100 how would you score it?"
|
||
|
|
||
|
# Convert PDF to image asynchronously
|
||
|
image = await asyncio.to_thread(pdf_to_images, pdf_file, 2)
|
||
|
|
||
|
# Run AI inference in a separate thread to avoid blocking
|
||
|
summary = await asyncio.to_thread(model_img.generate_content, [jobdesc_text, image])
|
||
|
|
||
|
os.remove(pdf_file) # Delete PDF after processing
|
||
|
|
||
|
formatted_summary = format_summary(summary.text)
|
||
|
return formatted_summary
|
||
|
|
||
|
@app.after_request
|
||
|
def add_security_headers(response):
|
||
|
response.headers['X-Frame-Options'] = 'DENY'
|
||
|
response.headers['X-Content-Type-Options'] = 'nosniff'
|
||
|
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
|
||
|
response.headers['Content-Security-Policy'] = (
|
||
|
"default-src 'self'; "
|
||
|
"script-src 'self' https://www.google.com https://www.gstatic.com https://code.jquery.com https://cdn.jsdelivr.net https://www.recaptcha.net; "
|
||
|
"style-src 'self' https://cdn.jsdelivr.net; "
|
||
|
"img-src 'self' data: https://www.google.com https://www.gstatic.com; "
|
||
|
"frame-src 'self' https://www.google.com https://www.recaptcha.net; "
|
||
|
"connect-src 'self' https://www.google.com https://www.gstatic.com https://www.recaptcha.net; "
|
||
|
"object-src 'none'; "
|
||
|
)
|
||
|
return response
|
||
|
|
||
|
@app.route('/', methods=['GET'])
|
||
|
def upload_page():
|
||
|
return render_template('upload.html', recaptcha_site_key=RECAPTCHA_SITE_KEY)
|
||
|
|
||
|
@app.route('/', methods=['POST'])
|
||
|
@limiter.limit("1 per 10 seconds")
|
||
|
async def process_upload():
|
||
|
recaptcha_response = request.form.get('g-recaptcha-response')
|
||
|
recaptcha_verification = requests.post(
|
||
|
"https://www.google.com/recaptcha/api/siteverify",
|
||
|
data={"secret": RECAPTCHA_SECRET_KEY, "response": recaptcha_response}
|
||
|
).json()
|
||
|
if not recaptcha_verification.get("success"):
|
||
|
return jsonify({"error": "reCAPTCHA verification failed."}), 400
|
||
|
|
||
|
text_input = sanitize_text(request.form.get('text_input', '').strip())
|
||
|
file = request.files.get('file')
|
||
|
|
||
|
if not text_input and not file:
|
||
|
return jsonify({"error": "Job description and PDF file are required."}), 400
|
||
|
|
||
|
if file and allowed_file(file.filename):
|
||
|
if file.content_length and file.content_length > app.config['MAX_FILE_SIZE']:
|
||
|
return jsonify({"error": "File size exceeds 4MB limit."}), 400
|
||
|
|
||
|
filename = f"{uuid.uuid4().hex}.pdf"
|
||
|
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
||
|
file.save(filepath)
|
||
|
|
||
|
if not is_valid_pdf(filepath):
|
||
|
os.remove(filepath)
|
||
|
return jsonify({"error": "Invalid or potentially harmful PDF."}), 400
|
||
|
|
||
|
summary = await summarize_pdf(filepath, text_input)
|
||
|
score_match = re.search(r"(\d{1,3})\s*(?:\/|out of)\s*100", summary, re.IGNORECASE)
|
||
|
score = int(score_match.group(1)) if score_match else None
|
||
|
return jsonify({"summary": summary, "score": score})
|
||
|
|
||
|
return jsonify({"error": "Invalid file format."}), 400
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
app.run(host=os.getenv("FLASK_RUN_HOST", "0.0.0.0"), port=int(os.getenv("FLASK_RUN_PORT", 49465)), debug=False)
|