-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathgcstt.py
144 lines (129 loc) · 4.89 KB
/
gcstt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#! python3
# -*- coding: utf-8 -*-
import nvksupport
import tempfile
import requests
import codecs
import random
import time
import json
import sys
import os
from google.cloud import speech_v1p1beta1
from google.cloud.speech_v1p1beta1 import enums
from google.cloud.speech_v1p1beta1 import types
from google.cloud import storage
from settings import gcsbucketName
def generate_json(response):
vbjson = {'transcript': {'words':[]}}
position = 0
for result in response.results:
for word in result.alternatives[0].words:
starttime = int(word.start_time.seconds*1000+word.start_time.nanos/1000000)
endtime = int(word.end_time.seconds*1000+word.end_time.nanos/1000000)
vbjson['transcript']['words'].append({'p':position,'s':starttime, 'c':word.confidence, 'e':endtime, 'w':word.word})
position = position+1
return vbjson
def speech_to_text(gcs_uri):
#ltt_context = open('context.txt', 'r').read().split('\n')
client = speech_v1p1beta1.SpeechClient()
audio = types.RecognitionAudio(uri=gcs_uri)
#speech_contexts_element = {"phrases": ltt_context, "boost": 11}
#speech_contexts = [speech_contexts_element]
config = {
"encoding": enums.RecognitionConfig.AudioEncoding.MP3,
"sample_rate_hertz": 48000,
"language_code": 'en-US',
#"speech_contexts": speech_contexts,
"max_alternatives": 11,
"model": "video",
"enable_word_confidence" : True,
"enable_word_time_offsets" : True,
"enable_automatic_punctuation": True
}
operation = client.long_running_recognize(config, audio)
print('Speech-to-Text running.')
response = operation.result()
return(response)
def upload_to_gcs(audio_filename):
basename = os.path.basename(audio_filename)
storage_client = storage.Client()
bucket_name = gcsbucketName
try:
bucket = storage_client.get_bucket(bucket_name)
except:
bucket = storage_client.create_bucket(bucket_name)
blob = bucket.blob(basename,chunk_size=5*1024*1024)
blob.upload_from_filename(audio_filename)
gcs_uri = "gs://"+bucket_name+'/'+basename
return (gcs_uri)
def clean_gcs(audio_filename):
basename = os.path.basename(audio_filename)
storage_client = storage.Client()
bucket_name = gcsbucketName
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(basename)
blob.delete()
def exec_transcribe(filename):
response = speech_to_text(upload_to_gcs(filename))
return generate_json(response)
def SpitTranscriptFromJson(jsonIn, outPath):
outTxtPath = outPath + '.txt'
outTxt = codecs.open(outTxtPath, 'w', 'utf-8')
data = jsonIn
words = data["transcript"]["words"]
outList = [words[0]['w']]
for i in range(1, len(words)):
if 'm' in words[i].keys():
outList[-1] += '.'
else:
outList[-1] += ' '
outList.append(words[i]['w'])
outTxt.writelines(outList)
outTxt.close()
print('Transcript written to ' + outTxtPath)
def spitJson(mediaId, outPath):
outJsonPath = outPath + '.json'
outJson = codecs.open(outJsonPath, 'w', 'utf-8')
json.dump(jsonObj, outJson)
outJson.close()
print('Data written to ' + outJsonPath)
if __name__ == '__main__':
argLen = len(sys.argv)
if argLen == 1:
audioIn = input('drag Audio or sourceVideo here').replace('"', '')
spitPath = os.path.splitext(audioIn)[0]
elif argLen == 2:
audioIn = sys.argv[1]
spitPath = os.path.splitext(audioIn)[0]
audioIn = audioIn.replace('"', '')
spitPath = spitPath.replace('"', '')
elif argLen > 2:
audioIn = sys.argv[1]
spitPath = sys.argv[2]
audioIn = audioIn.replace('"', '')
spitPath = spitPath.replace('"', '')
if (audioIn[-4:] == 'json'):
jsonPath = audioIn
with codecs.open(jsonPath, encoding = 'utf-8') as jsonFile:
SpitTranscriptFromJson(json.load(jsonFile), spitPath)
else:
temp = {}
if audioIn[-4:] != '.mp3':
sourceMedia_split = list(os.path.split(audioIn))
sourceMedia_split[1:] = os.path.splitext(sourceMedia_split[1])
tempDir = tempfile.gettempdir()
serial = str(random.randint(0,255))
tempName = sourceMedia_split[1] + serial
temp['path'] = os.path.join(tempDir, tempName)
#temp['path'] = os.path.join(tempDir, serial)
temp['audio_path'] = temp['path'] + '_a' + '.mp3'
scriptDir = os.path.dirname(os.path.realpath(sys.argv[0]))
libDir = os.path.join(scriptDir, 'Program')
os.chdir(libDir)
nvksupport.ConvertAudioVoicebase(audioIn, temp['audio_path'])
audioIn = temp['audio_path']
jsonObj = exec_transcribe(audioIn)
nvksupport.tempClean(temp)
spitJson(jsonObj, spitPath)
SpitTranscriptFromJson(jsonObj, spitPath)