-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathapp.py
More file actions
330 lines (267 loc) · 10.5 KB
/
app.py
File metadata and controls
330 lines (267 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
from flask import Flask, render_template, request, url_for, jsonify, send_file
import os
import cv2
import mediapipe as mp
import base64
import numpy as np
import time
import google.generativeai as genai
from googletrans import Translator
from gtts import gTTS
import uuid
import asyncio
from googletrans import Translator
from deep_translator import GoogleTranslator # Use sync translator to avoid async issues
genai.configure(api_key="AIzaSyBTUPE4zSz2cyrp2llJcZgz6Duv_c76aRw")
# Paths for ASL and ISL image folders
ASL_PATH = "static/ASL"
ISL_PATH = "static/ISL"
DETECTED_TEXT=''
WL=[]
AF=''
LANGUAGES = {
'hi': 'Hindi',
'mr': 'Marathi',
'pa': 'Punjabi',
'te': 'Telugu',
'ta': 'Tamil',
'kn': 'Kannada',
'ml': 'Malayalam',
'gu': 'Gujarati',
'bn': 'Bengali',
'en': 'English'
}
#functions for sttring processing
def compress_string(s, threshold=20):
compressed = ""
i = 0
while i < len(s):
count = 1
while i + 1 < len(s) and s[i] == s[i + 1]: # Count consecutive occurrences
count += 1
i += 1
if count >= threshold: # Only include if count meets the threshold
compressed += s[i]
i += 1
return compressed
##################
#prediction
##################
import nltk
from nltk.corpus import words
nltk.download("words")
word_list = words.words()
class TrieNode:
def __init__(self):
self.children = {}
self.is_end_of_word = False
self.words = []
class Trie:
def __init__(self):
self.root = TrieNode()
def insert(self, word):
"""Insert a word into the Trie."""
node = self.root
for char in word:
if char not in node.children:
node.children[char] = TrieNode()
node = node.children[char]
node.words.append(word)
node.is_end_of_word = True
def autocomplete(self, prefix):
"""Return all words that start with the given prefix."""
node = self.root
for char in prefix:
if char not in node.children:
return []
node = node.children[char]
return sorted(set(node.words))
trie = Trie()
for w in word_list:
trie.insert(w)
###########################################################3
# Function to translate text into sign language images
def get_sign_language_images(text, language):
folder_path = ASL_PATH if language == "ASL" else ISL_PATH
images = []
for letter in text.lower():
if letter.isalpha() or letter.isdigit():
image_filename = f"{letter}.jpg"
image_path = os.path.join(folder_path, image_filename)
if os.path.exists(image_path):
image_url = url_for('static', filename=f"{language.upper()}/{image_filename}")
images.append(image_url)
else:
images.append(url_for('static', filename="placeholder.jpg"))
return images
# MediaPipe setup
mp_hands = mp.solutions.hands
mp_drawing = mp.solutions.drawing_utils
BaseOptions = mp.tasks.BaseOptions
GestureRecognizer = mp.tasks.vision.GestureRecognizer
GestureRecognizerOptions = mp.tasks.vision.GestureRecognizerOptions
GestureRecognizerResult = mp.tasks.vision.GestureRecognizerResult
VisionRunningMode = mp.tasks.vision.RunningMode
# Global variable to store detected gesture
recognizer_detected = None
# Output function for gesture recognition
def print_result(result: GestureRecognizerResult, output_image: mp.Image, timestamp_ms: int):
global recognizer_detected
if result.gestures and len(result.gestures) > 0 and len(result.gestures[0]) > 0:
recognizer_detected = result.gestures[0][0].category_name
print('Gesture Recognition Result:', recognizer_detected)
else:
recognizer_detected = None
# Setting options for gesture recognition
options = GestureRecognizerOptions(
base_options=BaseOptions(model_asset_path='exported_models/gesture_recognizer.task'),
running_mode=VisionRunningMode.LIVE_STREAM,
result_callback=print_result)
recognizer = GestureRecognizer.create_from_options(options)
#flask routes
app = Flask(__name__)
@app.route('/')
def home():
return render_template('home.html')
@app.route('/translate', methods=['GET', 'POST'])
def translate():
if request.method == 'GET':
return render_template('trans.html')
# Handle POST request
text_input = request.form.get("text_input")
speech_input = request.form.get("speech_input")
language_choice = request.form.get("language")
input_text = text_input if text_input else speech_input
if input_text:
images = get_sign_language_images(input_text, language_choice)
return render_template('trans.html', images=images, input_text=input_text, language=language_choice)
return render_template('trans.html', images=None)
lr=''
count=0
@app.route('/process_frame', methods=['GET','POST'])
def process_frame():
global DETECTED_TEXT,lr,count
if request.method == 'GET':
return render_template('index.html')
global recognizer_detected
frame_timestamp_ms = int(time.time() * 1000)
print(frame_timestamp_ms)
# Get the image data from the request
data = request.json
image_data = data['image'].split(',')[1]
image_bytes = base64.b64decode(image_data)
np_array = np.frombuffer(image_bytes, np.uint8)
frame = cv2.imdecode(np_array, cv2.IMREAD_COLOR)
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)
# Process the frame for gesture recognition
recognizer.recognize_async(mp_image, frame_timestamp_ms)
# Process hand landmarks
with mp_hands.Hands(min_detection_confidence=0.5, min_tracking_confidence=0.5) as hands:
results = hands.process(rgb_frame)
if results.multi_hand_landmarks:
for hand_landmarks in results.multi_hand_landmarks:
mp_drawing.draw_landmarks(frame, hand_landmarks, mp_hands.HAND_CONNECTIONS)
# Display recognized gesture on the frame
#adding a bit of delay to ensure proper detection
if recognizer_detected:
if not lr:
lr = recognizer_detected
count+=1
elif lr == recognizer_detected:
count+=1
elif lr!=recognizer_detected:
lr=recognizer_detected
count=1
if count >= 20:
cv2.putText(frame, recognizer_detected, (50, 70), cv2.FONT_HERSHEY_SIMPLEX, 3, (0, 255, 0), 2, cv2.LINE_AA)
else:
cv2.putText(frame, recognizer_detected, (50, 70), cv2.FONT_HERSHEY_SIMPLEX, 3, (0, 0, 255), 2, cv2.LINE_AA)
DETECTED_TEXT=DETECTED_TEXT+recognizer_detected
# Encode processed image for response
ret, buffer = cv2.imencode('.jpg', frame)
processed_image = base64.b64encode(buffer).decode('utf-8')
return jsonify(result=recognizer_detected, image='data:image/jpeg;base64,' + processed_image)
#this routes resets the values to default
@app.route('/reset', methods=['POST'])
def reset():
global WL,DETECTED_TEXT
WL=[]
DETECTED_TEXT=""
return jsonify(response="reset done")
#this route will display autocomplete output in real-time
@app.route('/autocomplete', methods=['POST'])
def P_text():
global DETECTED_TEXT, WL
if len(DETECTED_TEXT) > 0:
WL = trie.autocomplete(compress_string(DETECTED_TEXT))
return jsonify(prediction=WL if len(WL) >=1 else ["no similar word found"])
else:
return jsonify(prediction="No gestures recognized")
#route for sentence using gen-ai
@app.route('/predict', methods=['POST'])
async def predict(): # Change to async function
try:
data = request.json
recognized_words = data.get("words", [])
lang_code = data.get("lang", "hi")
if not recognized_words:
return jsonify({"error": "No words provided"}), 400
# Generate a meaningful sentence using Gemini
prompt = f"Create a short, natural-sounding sentence using the word(s): {', '.join(recognized_words)}. The sentence should be meaningful, casual, and easy to read. Avoid special characters and formatting. Keep it simple and relevant to the given words."
model = genai.GenerativeModel("gemini-2.0-flash")
response = model.generate_content(prompt)
# Check if response is valid
if not hasattr(response, "text"):
raise ValueError("Invalid response from Gemini")
sentence = response.text.strip()
print(f"Generated Sentence: {sentence}")
# Use sync translator to avoid async issues
translator = Translator()
translated = await translator.translate(sentence, src='en', dest=lang_code)
translated_text = translated.text
print(f"Translated Sentence: {translated_text}")
audio_dir = "temp_audio"
if not os.path.exists(audio_dir):
os.makedirs(audio_dir)
# Generate audio file
filename = f"temp_{uuid.uuid4().hex}.mp3"
filepath = os.path.join("temp_audio", filename) # Ensure the correct path
tts = gTTS(text=translated_text, lang=lang_code)
tts.save(filepath)
return jsonify({"sentence": sentence, "translated": translated_text, "audio": filename})
except Exception as e:
print(f"❌ Error in /predict: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/audio/<path:filename>', methods=['GET'])
def get_audio(filename):
try:
filepath = os.path.join("temp_audio", filename)
if not os.path.exists(filepath):
return jsonify({"error": "File not found"}), 404
response = send_file(filepath, mimetype="audio/mpeg")
def cleanup():
if os.path.exists(filepath):
os.remove(filepath)
response.call_on_close(cleanup)
return response
except Exception as e:
print("Error in /audio:", str(e)) # Debugging
return jsonify({"error": str(e)}), 500
try:
global AF
AF = filename # Ensure the correct file path
response = send_file(AF, mimetype="audio/mpeg")
# Define cleanup function correctly
def cleanup():
print("inside clean")
global AF
if os.path.exists(AF):
os.remove(AF) # Delete file after sending
response.call_on_close(cleanup) # Register cleanup function properly
response.direct_passthrough = False
return response
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(debug=True)