-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPosTagger.py
129 lines (108 loc) · 4.44 KB
/
PosTagger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import glob
import re
from collections import Counter
from nltk.corpus.reader import TaggedCorpusReader
from collections import defaultdict
def clean(word):
word = re.sub('\s+', '', word.lower())
return word
class PosTagger():
def __init__(self):
self.unknown_prob = 0.0000000000001
self.tagged_file = glob.glob("test.txt")
self.bigram_cnt = {}
self.unigram_cnt = {}
self.tag_count = defaultdict(lambda: 0)
self.tag_word_count = Counter()
self.transition_probabilities = defaultdict(lambda: self.unknown_prob)
self.emmission_probabilities = defaultdict(lambda: self.unknown_prob)
def ngrams(self, text, n):
Ngrams = []
for i in range(len(text)): Ngrams.append(tuple(text[i: i + n]))
return Ngrams
def bigram_counts(self, tags):
for i_tag_bigram in self.ngrams(tags, 2):
if i_tag_bigram in self.bigram_cnt:
self.bigram_cnt[i_tag_bigram] += 1
else:
self.bigram_cnt[i_tag_bigram] = 1
return self.bigram_cnt
def unigram_counts(self, tags):
for tag in tags:
if tag in self.unigram_cnt:
self.unigram_cnt[tag] += 1
else:
self.unigram_cnt[tag] = 1
return self.unigram_cnt
def tag_word_counts(self, tagged_words):
for tag, word in tagged_words:
self.tag_count[tag] += 1
if (word, tag) in self.tag_word_count:
self.tag_word_count[(tag, word)] += 1
else:
self.tag_word_count[(tag, word)] = 1
return self.tag_word_count
def transition_probability(self, tags):
bigrams = self.ngrams(tags, 2)
for bigram in bigrams:
self.transition_probabilities[bigram] = self.bigram_cnt[bigram] / self.unigram_cnt[bigram[0]]
return self.transition_probabilities
def emmission_probability(self, tagged_words):
for tag, word in tagged_words:
self.emmission_probabilities[tag, word] = self.tag_word_count[tag, word] / self.tag_count[tag]
return self.emmission_probabilities
def initial_probabilities(self, tag):
return self.transition_probabilities["START", tag]
def viterbi(self, observable, in_states):
states = set(in_states)
states.remove("START")
states.remove("END")
trails = {}
for s in states:
trails[s, 0] = self.initial_probabilities(s) * self.emmission_probabilities[s, observable[0]]
for o in range(1, len(observable)):
obs = observable[o]
for s in states:
v1 = [(trails[k, o - 1] * self.transition_probabilities[k, s] * self.emmission_probabilities[s, obs], k) for k in states]
k = sorted(v1)[-1][1]
trails[s, o] = trails[k, o - 1] * self.transition_probabilities[k, s] * self.emmission_probabilities[s, obs]
best_path = []
for o in range(len(observable) - 1, -1, -1):
k = sorted([(trails[k, o], k) for k in states])[-1][1]
best_path.append((observable[o], k))
best_path.reverse()
for x in best_path:
print(str(x[0]) + "," + str(x[1]))
return best_path
def tag_test(self, all_tags):
words = []
with open("tag_test.txt") as f:
for line in f:
if "sentence ID" in line:
words = []
print(line)
elif "<EOS>" in line:
self.viterbi([clean(w) for w in words], all_tags)
print("<EOS>")
else:
words.append(line)
def tag(self):
reader_corpus = TaggedCorpusReader('.', self.tagged_file)
tagged_words = []
all_tags = []
for sent in reader_corpus.tagged_sents(): # get tagged sentences
print(sent)
all_tags.append("START")
for word, tag in sent:
if tag is None or tag in ['NIL']:
continue
all_tags.append(tag)
word = clean(word)
tagged_words.append((tag, word))
all_tags.append("END")
self.tag_word_counts(tagged_words)
self.bigram_cnt = self.bigram_counts(all_tags)
self.unigram_cnt = self.unigram_counts(all_tags)
self.transition_probability(all_tags)
self.emmission_probability(tagged_words)
# self.tag_test(all_tags)