Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run Multiple-Feature Select Model in Python_Code #21

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions Auto_Run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import pandas as pd
import numpy as np
import yaml
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

from FS.pso import jfs as jfs_pso
from FS.ga import jfs as jfs_ga
from FS.de import jfs as jfs_de
from FS.ba import jfs as jfs_ba
from FS.cs import jfs as jfs_cs
from FS.fa import jfs as jfs_fa
from FS.fpa import jfs as jfs_fpa
from FS.sca import jfs as jfs_sca
from FS.woa import jfs as jfs_woa


class DataPipeline(object):
"""
Data Model with Pipeline to Process and Feature Select.
"""
def __init__(self, _ori_data_path:str, _train_test_ratio:float, _str_read_meta_para:str) -> None:
"""
para1:_ori_data_path:Read_RawData_Path(CSV_Format) => Convert to DataFrame
para2:_train_test_ratio:Split DataSet Ratio
"""
self._ori_data_path = _ori_data_path
self._train_test_ratio = _train_test_ratio
self._str_read_meta_para = _str_read_meta_para

self.data = None
self.feat = None
self.label = None

self.fold = None # Contain Splited Train/Test Data
self.meta_para = None

self.sf = None # Select_Feature
self.fmdl = None # Feature Model

def _load_ori_file(self, _ori_path) -> None:
'''
Load Raw Data File By CSV Path
'''
self.data = pd.read_csv(_ori_path)
self.data = self.data.values
#All Feature
self.feat = np.asarray(self.data[:, 0:-1])
#Predict Y Value
self.label = np.asarray(self.data[:, -1])

def _data_split(self, ratio:float) -> None:
'''
Setting Train/Test Ratio
'''
xtrain, xtest, ytrain, ytest = train_test_split(self.feat, self.label, test_size=ratio, stratify=self.label)
self.fold = {'xt':xtrain, 'yt':ytrain, 'xv':xtest, 'yv':ytest}

def _read_algo_parameter(self, _str_read_meta_para:str) ->None:
'''
Read Meta Para File
'''
with open('algo_para.yaml', 'r') as _str_read_meta_para:
self.meta_para = yaml.full_load(_str_read_meta_para)['meta_para']

def _build_fmdl(self, algo_name):
'''
Construct Each Meta Model
'''
interface_fmdl = {'pso': jfs_pso,'ga':jfs_ga, 'de':jfs_de, 'ba':jfs_ba, 'cs':jfs_cs, 'fa': jfs_fa, 'fpa':jfs_fpa, 'sca':jfs_sca, 'woa':jfs_woa}
return interface_fmdl[algo_name]

def _process_feature_select(self, strMetaName:str, listMetaOpts:list) -> None:
"""
Prepare MetaModel and X Feature Data
"""
# perform feature selection
## Append Fold Data Into OPTS
listMetaOpts['fold'] = self.fold
#self.fmdl = jfs_ga(self.feat, self.label, listMetaOpts)
self.fmdl = self._build_fmdl(strMetaName)(self.feat, self.label, listMetaOpts)
self.sf = self.fmdl['sf']

def _data_feature_select(self, strMetaName:str, listMetaOpts:list)->None:
"""
model with selected features
"""
num_train = np.size(self.fold['xt'], 0)
num_valid = np.size(self.fold['xv'], 0)
x_train = self.fold['xt'][:, self.sf]
y_train = self.fold['yt'].reshape(num_train) # Solve bug
x_valid = self.fold['xv'][:, self.sf]
y_valid = self.fold['yv'].reshape(num_valid) # Solve bug

mdl = KNeighborsClassifier(n_neighbors = listMetaOpts['k'])
mdl.fit(x_train, y_train)

# accuracy
y_pred = mdl.predict(x_valid)
Acc = np.sum(y_valid == y_pred) / num_valid
print("Accuracy:", 100 * Acc)

# number of selected features
num_feat = self.fmdl['nf']
print("Feature Size:", num_feat)

def _plot_converege(self, strMetaName:str, listMetaOpts:list)-> None:
'''
plot convergence
'''
curve = self.fmdl['c']
curve = curve.reshape(np.size(curve,1))
x = np.arange(0, listMetaOpts['T'], 1.0) + 1.0
fig, ax = plt.subplots()
ax.plot(x, curve, 'o-')
ax.set_xlabel('Number of Iterations')
ax.set_ylabel('Fitness')
ax.set_title(strMetaName)
ax.grid()
plt.show()

def proceed(self)->None:
'''
Execute Function
'''
self._load_ori_file(self._ori_data_path)
self._data_split(self._train_test_ratio)
self._read_algo_parameter(self._str_read_meta_para)

for meta in self.meta_para:
print(meta['name'])
print(meta['opts'])
self._process_feature_select(meta['name'], meta['opts'])
self._data_feature_select(meta['name'], meta['opts'])
self._plot_converege(meta['name'], meta['opts'])

def main():
str_read_file_path = './ionosphere.csv'
str_read_meta_para = './algo_para.yaml'
float_split_ratio = 0.3

auto_run = DataPipeline(str_read_file_path, float_split_ratio, str_read_meta_para)
auto_run.proceed()

if __name__ == '__main__':
main()

33 changes: 33 additions & 0 deletions FS/__basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import numpy as np
from numpy.random import rand
from FS.__tran_func import tran_func

def init_position(lb, ub, N, dim):
X = np.zeros([N, dim], dtype='float')
for i in range(N):
for d in range(dim):
X[i,d] = lb[0,d] + (ub[0,d] - lb[0,d]) * rand()

return X

def binary_conversion(X, thres, N, dim, trans_func=tran_func.sl_trans):
Xbin = np.zeros([N, dim], dtype='int')
for i in range(N):
for d in range(dim):
X_trans = trans_func(X[i,d])
if X_trans > thres:
Xbin[i,d] = 1
else:
Xbin[i,d] = 0

return Xbin


def boundary(x, lb, ub):
if x < lb:
x = lb
if x > ub:
x = ub

return x

10 changes: 10 additions & 0 deletions FS/__tran_func.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import numpy as np
class tran_func():

@staticmethod
def l_trans(val):
return val

@staticmethod
def sl_trans(val):
return 1 / (1+np.exp(-2 * val))
37 changes: 4 additions & 33 deletions FS/ba.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,13 @@
import numpy as np
from numpy.random import rand
from FS.functionHO import Fun


def init_position(lb, ub, N, dim):
X = np.zeros([N, dim], dtype='float')
for i in range(N):
for d in range(dim):
X[i,d] = lb[0,d] + (ub[0,d] - lb[0,d]) * rand()

return X


def binary_conversion(X, thres, N, dim):
Xbin = np.zeros([N, dim], dtype='int')
for i in range(N):
for d in range(dim):
if X[i,d] > thres:
Xbin[i,d] = 1
else:
Xbin[i,d] = 0

return Xbin


def boundary(x, lb, ub):
if x < lb:
x = lb
if x > ub:
x = ub

return x

from FS.__basic import init_position, binary_conversion, boundary


def jfs(xtrain, ytrain, opts):
# Parameters
ub = 1
lb = 0
ub = opts['ub']
lb = opts['lb']
thres = 0.5
fmax = 2 # maximum frequency
fmin = 0 # minimum frequency
Expand Down
35 changes: 3 additions & 32 deletions FS/cs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,10 @@
import numpy as np
from numpy.random import rand
from FS.functionHO import Fun
from FS.__basic import init_position, binary_conversion, boundary
import math


def init_position(lb, ub, N, dim):
X = np.zeros([N, dim], dtype='float')
for i in range(N):
for d in range(dim):
X[i,d] = lb[0,d] + (ub[0,d] - lb[0,d]) * rand()

return X


def binary_conversion(X, thres, N, dim):
Xbin = np.zeros([N, dim], dtype='int')
for i in range(N):
for d in range(dim):
if X[i,d] > thres:
Xbin[i,d] = 1
else:
Xbin[i,d] = 0

return Xbin


def boundary(x, lb, ub):
if x < lb:
x = lb
if x > ub:
x = ub

return x


# Levy Flight
def levy_distribution(beta, dim):
# Sigma
Expand All @@ -54,8 +25,8 @@ def levy_distribution(beta, dim):

def jfs(xtrain, ytrain, opts):
# Parameters
ub = 1
lb = 0
ub = opts['ub']
lb = opts['lb']
thres = 0.5
Pa = 0.25 # discovery rate
alpha = 1 # constant
Expand Down
35 changes: 3 additions & 32 deletions FS/de.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,13 @@
import numpy as np
from numpy.random import rand
from FS.functionHO import Fun


def init_position(lb, ub, N, dim):
X = np.zeros([N, dim], dtype='float')
for i in range(N):
for d in range(dim):
X[i,d] = lb[0,d] + (ub[0,d] - lb[0,d]) * rand()

return X


def binary_conversion(X, thres, N, dim):
Xbin = np.zeros([N, dim], dtype='int')
for i in range(N):
for d in range(dim):
if X[i,d] > thres:
Xbin[i,d] = 1
else:
Xbin[i,d] = 0

return Xbin


def boundary(x, lb, ub):
if x < lb:
x = lb
if x > ub:
x = ub

return x
from FS.__basic import init_position, binary_conversion, boundary


def jfs(xtrain, ytrain, opts):
# Parameters
ub = 1
lb = 0
ub = opts['ub']
lb = opts['lb']
thres = 0.5
CR = 0.9 # crossover rate
F = 0.5 # factor
Expand Down
35 changes: 3 additions & 32 deletions FS/fa.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,13 @@
import numpy as np
from numpy.random import rand
from FS.functionHO import Fun


def init_position(lb, ub, N, dim):
X = np.zeros([N, dim], dtype='float')
for i in range(N):
for d in range(dim):
X[i,d] = lb[0,d] + (ub[0,d] - lb[0,d]) * rand()

return X


def binary_conversion(X, thres, N, dim):
Xbin = np.zeros([N, dim], dtype='int')
for i in range(N):
for d in range(dim):
if X[i,d] > thres:
Xbin[i,d] = 1
else:
Xbin[i,d] = 0

return Xbin


def boundary(x, lb, ub):
if x < lb:
x = lb
if x > ub:
x = ub

return x
from FS.__basic import init_position, binary_conversion, boundary


def jfs(xtrain, ytrain, opts):
# Parameters
ub = 1
lb = 0
ub = opts['ub']
lb = opts['lb']
thres = 0.5
alpha = 1 # constant
beta0 = 1 # light amplitude
Expand Down
Loading