Skip to content

Commit e62a4b8

Browse files
authored
Merge pull request #3 from gistrec/develop
Delete multithreading, remove unnecessary functions
2 parents affcb38 + 1db6298 commit e62a4b8

File tree

5 files changed

+64
-65
lines changed

5 files changed

+64
-65
lines changed

src/Config.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
std::string fileName;
77
int mtu; // Max packet size to send and receive
88

9-
std::atomic<int> ttl; // Max time to wait new packets
9+
int ttl; // Max time to wait new packets
1010
int ttl_max;
1111

1212
SOCKET _socket;

src/Main.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ int main(int argc, char* argv[]) {
6464
std::cerr << "Cant't get access to broadcast" << std::endl; //
6565
exit(1); //
6666
} // If option 'broadcast' is yes
67-
broadcast_address.sin_addr.s_addr = INADDR_ANY; // change server address
67+
broadcast_address.sin_addr.s_addr = INADDR_BROADCAST; // change server address
6868
} else { // to broadcast
6969
broadcast_address.sin_addr.s_addr = // Else change server address
7070
inet_addr(result["broadcast"].as<std::string>().c_str()); // to address in option
@@ -77,6 +77,16 @@ int main(int argc, char* argv[]) {
7777
exit(1); //
7878
} //
7979

80+
#if defined(_WIN32) || defined(_WIN64) //
81+
int tv = 1 * 1000; // user timeout in milliseconds [ms] //
82+
setsockopt(_socket, SOL_SOCKET, SO_RCVTIMEO, (char*)&tv, sizeof(tv)); // Set socket
83+
#else // receive timeout
84+
struct timeval tv; // to 1 sec
85+
tv.tv_sec = 1; //
86+
tv.tv_usec = 0; //
87+
setsockopt(_socket, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv)); //
88+
#endif
89+
8090
// Run receiver or sender
8191
if (result["type"].as<std::string>() == "receiver") { //
8292
Receiver::run(result); //
@@ -87,4 +97,4 @@ int main(int argc, char* argv[]) {
8797
}
8898

8999
return 0;
90-
}
100+
}

src/Receiver.cpp

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,10 @@ std::vector<int> getEmptyParts() {
2222
}
2323

2424
/**
25-
* Run in secondary thread
2625
* Start while receive packet 'FINISH'
2726
* Get empty part's and requests it's from the server
2827
*/
29-
void async checkParts() {
28+
void checkParts() {
3029
char* buffer = new char[100];
3130

3231
std::vector<int> emptyParts = getEmptyParts();
@@ -40,29 +39,36 @@ void async checkParts() {
4039

4140
std::cout << "Requested " << index << " part" << std::endl;
4241
}
43-
std::this_thread::sleep_for(1s);
4442
emptyParts = getEmptyParts();
4543
}
4644

4745
std::ofstream output(fileName, std::ofstream::binary); //
4846
output.write(file, file_length); // Save file
49-
system(std::string("CertUtil -hashfile " + fileName + " SHA256").c_str()); //
5047
}
5148

5249

5350
void run(cxxopts::ParseResult &options) {
54-
std::thread(checkTTL); // Create thread, to check server timeout
51+
bool finish = false; // Sender finish transfering
5552

56-
char* buffer = new char[2 * mtu];
53+
char* buffer;
5754

5855
while (auto length = recvfrom(_socket, buffer, 2 * mtu, 0, (sockaddr*) &server_address, &server_address_length)) {
56+
// Sender is no longer available
57+
if (ttl <= 0) return;
58+
59+
// If Sender finish transfering check missing parts every 1 sec
60+
if (finish) checkParts();
61+
62+
if (length <= 0) continue; // If no more packets in buffer
63+
5964
ttl = ttl_max; // Update ttl
6065

6166
if (strncmp(buffer, "NEW_PACKET", 10) == 0) {
6267
file_length = Utils::getIntFromBytes(buffer + 10, 4);
68+
mtu = Utils::getIntFromBytes(buffer + 14, 4);
6369

70+
buffer = new char[2 * mtu];
6471
file = new char[file_length];
65-
memset(file, 0, file_length);
6672

6773
std::cout << "Receive information about new file size: " << file_length << std::endl;
6874
std::cout << "Part count: " << int((float)file_length / (float)mtu + 0.5) << std::endl;
@@ -74,10 +80,11 @@ void run(cxxopts::ParseResult &options) {
7480

7581
memcpy(file + part * options["mtu"].as<int>(), buffer + 16, size);
7682
} else if (strncmp(buffer, "FINISH", 6) == 0) {
77-
std::cout << "Server finish transfering" << std::endl;
78-
79-
std::cout << "Create thread, witch ask not accepted parts" << std::endl;
80-
std::thread(checkParts).detach();
83+
// If we don't receive a finish message
84+
if (!finish) {
85+
std::cout << "Server finish transfering" << std::endl;
86+
finish = true;
87+
}
8188
}
8289
}
8390
delete[] buffer;

src/Sender.cpp

Lines changed: 32 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@ void sendPart(int part_index) {
2626
}
2727

2828
void run(cxxopts::ParseResult &options) {
29-
std::mt19937 gen(time(0)); // TODO: remove (for tests only)
30-
std::uniform_int_distribution<> uid(0, 100); // TODO: remove (for tests only)
31-
3229
buffer = new char[2 * mtu];
3330

3431
std::ifstream input(fileName, std::ios::binary); //
@@ -45,46 +42,41 @@ void run(cxxopts::ParseResult &options) {
4542

4643

4744
snprintf(buffer, 11, "NEW_PACKET"); //
48-
Utils::writeBytesFromInt(buffer + 10, file_length, 4); // Send information
45+
Utils::writeBytesFromInt(buffer + 10, file_length, 4); //
46+
Utils::writeBytesFromInt(buffer + 14, mtu, 4); // Send information
4947
sendto(_socket, buffer, 14, 0, (sockaddr*) &broadcast_address, // about new file
50-
sizeof(broadcast_address)); //
51-
std::cout << "Send information about new file with size " << (int) file_length << std::endl;
48+
sizeof(broadcast_address)); // + size and mtu
49+
std::cout << "Send information about new file with size "; //
50+
std::cout << (int) file_length << " mtu " << mtu <<std::endl; //
5251

5352
int part_index = 0;
5453

5554
while (part_index * mtu < file_length) { //
56-
sent_part.insert({ part_index, 0 }); //
57-
//
58-
if (uid(gen) < 90) { // Send all parts every 1 sec
59-
sendPart(part_index); //
60-
} //
61-
//
55+
sent_part.insert({ part_index, 0 }); //
56+
//
57+
sendPart(part_index); // Send parts
58+
// every 20ms
6259
part_index++; //
63-
std::this_thread::sleep_for(1s); //
60+
std::this_thread::sleep_for(20ms); //
6461
} //
6562

6663
snprintf(buffer, 7, "FINISH"); // Send information
6764
sendto(_socket, buffer, 6, 0, (sockaddr*) &broadcast_address, // the end of transaction
6865
sizeof(broadcast_address)); //
6966
std::cout << "File transfer complete" << std::endl;
67+
// Last time, while sender send message about finish
68+
long lastFinishSendTime = 0;
7069

71-
// Set socket receive timeout to 1 sec
72-
#if defined(_WIN32) || defined(_WIN64)
73-
int tv = 1 * 1000; // user timeout in milliseconds [ms]
74-
setsockopt(_socket, SOL_SOCKET, SO_RCVTIMEO, (char*)&tv, sizeof(tv));
75-
#else
76-
struct timeval tv;
77-
tv.tv_sec = 1;
78-
tv.tv_usec = 0;
79-
setsockopt(_socket, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv));
80-
#endif
81-
82-
while (ttl--) {
70+
while (ttl) {
8371
auto result = recvfrom(_socket, (char *)buffer, 100, 0, (struct sockaddr*) &broadcast_address, &client_address_length);
84-
std::cout << result << std::endl;
85-
if (result <= 0) continue;
8672

87-
ttl = ttl_max;
73+
// Every seconds send information about end of sending file
74+
if (result <= 0) {
75+
ttl--;
76+
snprintf(buffer, 7, "FINISH");
77+
sendto(_socket, buffer, 6, 0, (struct sockaddr*) &broadcast_address, sizeof(broadcast_address));
78+
continue;
79+
}
8880

8981
if (strncmp(buffer, "RESEND", 6) == 0) {
9082
int part = Utils::getIntFromBytes(buffer + 6, 4);
@@ -93,22 +85,27 @@ void run(cxxopts::ParseResult &options) {
9385
auto now_ms = std::chrono::time_point_cast<std::chrono::seconds>(now);
9486
auto epoch = now_ms.time_since_epoch();
9587
auto value = std::chrono::duration_cast<std::chrono::seconds>(epoch);
96-
long duration = value.count();
88+
long duration = value.count(); // Unix time in second
89+
90+
ttl = ttl_max;
9791

98-
if (duration - sent_part[part] > 1) {
92+
if (duration - sent_part[part] >= 1) {
9993
sent_part[part] = duration;
10094
std::cout << "Клиент запросил " << part << " часть" << std::endl;
10195
sendPart(part);
10296
}
103-
} else if (strncmp(buffer, "STATUS", 6) == 0) {
104-
std::cout << "Клиент запросил статус" << std::endl;
10597

106-
snprintf(buffer, 6, "FINISH");
107-
sendto(_socket, buffer, 6, 0, (struct sockaddr*) &broadcast_address, sizeof(broadcast_address));
98+
// Every seconds send information about end of sending file
99+
if (duration - lastFinishSendTime >= 1) {
100+
lastFinishSendTime = duration;
101+
snprintf(buffer, 7, "FINISH");
102+
sendto(_socket, buffer, 6, 0, (struct sockaddr*) &broadcast_address, sizeof(broadcast_address));
103+
}
108104
}
105+
109106
}
110107
std::cout << "Process no longer be working" << std::endl;
111108
}
112109

113110

114-
} //namespace Sender
111+
} //namespace Sender

src/Utils.hpp

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#define FILEBROADCASTER_UTILS_H
33

44
#if defined(_WIN32) || defined(_WIN64)
5+
#define _WINSOCK_DEPRECATED_NO_WARNINGS
56
#define addr_len int //
67
#include <winsock2.h> // Windows
78
#include <windows.h> // socket
@@ -11,7 +12,6 @@
1112
#define SOCKADDR_IN sockaddr_in //
1213
#define addr_len socklen_t // Linux socket
1314
#include <arpa/inet.h> //
14-
#include <pthread.h> //
1515
#endif
1616

1717
#include <thread>
@@ -33,9 +33,6 @@
3333
#include "../lib/cxxopts/include/cxxopts.hpp"
3434
#include "Config.hpp"
3535

36-
#define _WINSOCK_DEPRECATED_NO_WARNINGS
37-
#define async
38-
3936

4037
using namespace std::chrono_literals;
4138

@@ -67,18 +64,6 @@ namespace Utils {
6764
buffer[count - i - 1] = (char) (value >> (i * 8));
6865
}
6966
}
70-
71-
/**
72-
* Run in secondary thread
73-
* Decrease ttl every second
74-
* If ttl < 0 program should stop
75-
*/
76-
void async checkTTL() {
77-
while (ttl--) {
78-
std::this_thread::sleep_for(1s);
79-
}
80-
std::cout << "Process no longer be working" << std::endl;
81-
}
8267
}
8368

8469
#endif //FILEBROADCASTER_UTILS_H

0 commit comments

Comments
 (0)