diff --git a/.gitignore b/.gitignore index d8e83da2..3406b1c5 100644 --- a/.gitignore +++ b/.gitignore @@ -56,4 +56,4 @@ core !core/ *.log.* *.log - +*yyl* diff --git a/RELEASE.md b/RELEASE.md index d8e4f39a..60eb9340 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -3,6 +3,7 @@ - [Release Notes](#release-notes) - [Introduction](#introduction) + - [Rosetta v0.3.0](#rosetta-v030) - [Rosetta v0.2.1](#rosetta-v021) - [Rosetta v0.2.0](#rosetta-v020) - [Rosetta v0.1.1](#rosetta-v011) @@ -19,6 +20,18 @@ This document will maintain and continually update the release notes of each version of Rosetta. If you have questions or comments, please contact us via rosetta@latticex.foundation. +## Rosetta v0.3.0 + +1. Added `PrivateTextLineDataset`, `PrivateInput` secure operations. + +2. Added `SecureLogicalAnd`, `SecureLogicalOr`, `SecureLogicalXor`, `SecureLogicalNot` secure operations. + +3. Speedup some backend operations. + +4. Uses related python classes such as PrivateTextLineDataset and iterators to load large data sets, thereby reducing memory usage. + +5. Some known bugs are fixed. + ## Rosetta v0.2.1 1. Support 128-bit integer data type, which enables big integer and high precision float-point operations. diff --git a/cc/modules/common/include/utils/file_directory.h b/cc/modules/common/include/utils/file_directory.h index 3d862057..8c040d74 100644 --- a/cc/modules/common/include/utils/file_directory.h +++ b/cc/modules/common/include/utils/file_directory.h @@ -26,6 +26,13 @@ #include #include #include +#include +#include +#include +#include +#include "cc/modules/common/include/utils/logger.h" + + using namespace std; /** @@ -134,3 +141,77 @@ static void fromfile(vector>& v, const string& filename) { ifile.close(); } +// Get line or fields count of `csv` file +// +// Arguments: +// * file: file path to analyse. +// * delimiter: the delimiter of fields. +// * lines: output of count of lines +// * fields: output of count of fields in a line +// * ignore_blank_line: whether skip or ignore blank lines +// +// Returns an error if the while loop could not be fully constructed. +// +// TODO(kelvin): +int get_file_lines_fields(const string& file, char delimiter, int& lines, int& fields, bool ignore_blank_line=false) +{ + fields = 1; + lines = 0; + const int buf_size = 16*1024; + char buf[buf_size] = {0}; + int fd = open(file.data(), O_RDONLY); + if (fd == -1) + { + log_error << "open file: " << file << "failed!" << endl; + return -1; + } + + // read the first line + ssize_t bytes = 0; + if (0 < (bytes = read(fd, buf, 4096))) { + char* p = buf; + while (*p != '\n' && p != buf+bytes) { + if (*(p++) == ',') + fields++; + } + } + //reset file to begin + lseek(fd, 0, SEEK_SET); + + int index = 0; + while(0 < (bytes=read(fd, buf, buf_size))){ + //count lines for the buffer + char* p = buf; + if (ignore_blank_line) { + char* pre = p; + while (p != buf+bytes) + { + if(*(p++) == '\n') + { + if (p - pre == 1) + { + pre = p; + continue; + } + + lines++; + pre = p; + } + } + } else { + while (p != buf+bytes) + { + if(*(p++) == '\n') + { + lines++; + } + } + } + + if (bytes >=1 && bytes < buf_size && buf[bytes-1] !='\n') + ++lines; + } + + log_debug << "lines: "<< lines << ", fields: " << fields << endl; + return 0; +} diff --git a/cc/modules/protocol/mpc/snn/include/snn_opsets.h b/cc/modules/protocol/mpc/snn/include/snn_opsets.h index 9bd5bcb2..79a366ed 100644 --- a/cc/modules/protocol/mpc/snn/include/snn_opsets.h +++ b/cc/modules/protocol/mpc/snn/include/snn_opsets.h @@ -667,7 +667,7 @@ class ReconstructBit2PC : public OpBase { /* @note: this Polynomial is mostly for internal usage, - especially for complex functionalities, such as Log and Log1p, that + especially for complex funtionalities, such as Log and Log1p, that are implemented with polynomial interpolation. @attention: for now DO NOT use this directly if you are not sure. @@ -809,7 +809,7 @@ class Sigmoid : public OpBase { } int Run(const vector& a, vector& b, size_t size) { -#define USE_SIGMOID_VERSION 1 +#define USE_SIGMOID_VERSION 2 #if USE_SIGMOID_VERSION == 0 return RunChebyshevPoly(a, b, size); #elif USE_SIGMOID_VERSION == 1 diff --git a/cc/modules/protocol/mpc/snn/src/snn_protocol_ops.cpp b/cc/modules/protocol/mpc/snn/src/snn_protocol_ops.cpp index 87d95571..f823bfb9 100644 --- a/cc/modules/protocol/mpc/snn/src/snn_protocol_ops.cpp +++ b/cc/modules/protocol/mpc/snn/src/snn_protocol_ops.cpp @@ -186,7 +186,7 @@ int SnnProtocolOps::PrivateInput( vector out_vec; std::make_shared(_op_msg_id, net_io_)->Run(party_id, in_vec, out_vec); - snn_encode_(out_vec, out_str_vec); + snn_encode(out_vec, out_str_vec); return 0; } diff --git a/cc/modules/protocol/mpc/snn/tests/snn_truncation.cpp b/cc/modules/protocol/mpc/snn/tests/snn_truncation.cpp new file mode 100644 index 00000000..c6aa9295 --- /dev/null +++ b/cc/modules/protocol/mpc/snn/tests/snn_truncation.cpp @@ -0,0 +1,50 @@ +#include "snn__test.h" + +void run(int partyid) { + SNN_PROTOCOL_TEST_INIT(partyid); + // mimic 8^3 * 8^2 + vector X(100000, 8.0 * 8.0 * 8.0); + vector Y(100000, 8.0 * 8.0); + + vector EXPECT(100000, 32768.0); + size_t size = X.size(); + + // vector X = { 8.0, -8.0, 5, -5, 2, -2}; + // vector EXPECT = {0.9997, 0.00033, 0.9933, 0.00669, 0.8808, 0.1192}; + // size_t size = X.size(); + print_vec(X, 10, "X"); + print_vec(Y, 10, "Y"); + + string msgid("Mul OP(s) (share,share)"); + cout << __FUNCTION__ << " " << msgid << endl; + + vector strX, strY, strZ; + vector zZ(strZ.size()); + // In theory, the probability of this truncation error is about 1/(2^{64-(15+16+16)}, + // i.e. 1/(2^17) = 0.000008 + // we do this 10-timers to make this more likely to happen. + int wrong_cnt = 0; + int ITER = 10; + for(int kkk = 0; kkk < ITER; ++kkk) { + snn0.GetOps(msgid)->PrivateInput(0, X, strX); + snn0.GetOps(msgid)->PrivateInput(0, Y, strY); + snn0.GetOps(msgid)->Mul(strX, strY, strZ); + snn0.GetOps(msgid)->Reveal(strZ, zZ); + print_vec(zZ, 10, "SNN Mul plaintext:"); + print_vec(EXPECT, 10, "Mul expected:"); + for(int i = 0; i < size; ++i) { + auto inner = stol(zZ[i]); + auto expected = long(EXPECT[i]); + if (inner != expected) { + wrong_cnt++; + cout << i << "-th item wrong in iter: " << kkk << "!!!" << endl; + cout << "inner: " << inner << " <-> expected: " << expected << endl; + } + } + } + cout << "error num:" << wrong_cnt << endl; + cout << "probability in this case:" << (wrong_cnt * 1.0) / (ITER * size) << endl; + SNN_PROTOCOL_TEST_UNINIT(partyid); +} + +RUN_MPC_TEST(run); \ No newline at end of file diff --git a/cc/tf/secureops/data/secure_dataset_ops.cc b/cc/tf/secureops/data/secure_dataset_ops.cc new file mode 100644 index 00000000..1fa5e2bf --- /dev/null +++ b/cc/tf/secureops/data/secure_dataset_ops.cc @@ -0,0 +1,35 @@ +// ============================================================================== +// Copyright 2020 The LatticeX Foundation +// This file is part of the Rosetta library. +// +// The Rosetta library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Rosetta library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Rosetta library. If not, see . +// ============================================================================== + +#include "tensorflow/core/framework/common_shape_fns.h" +#include "tensorflow/core/framework/op.h" +#include "tensorflow/core/framework/op_def_builder.h" +#include "tensorflow/core/framework/shape_inference.h" + +namespace tensorflow { + +REGISTER_OP("PrivateTextLineDataset") + .Input("filenames: string") + .Input("compression_type: string") + .Input("buffer_size: int64") + .Input("data_owner: int64") + .Output("handle: variant") + .SetIsStateful(); + // shape function will depressed at present !!!! + +} \ No newline at end of file diff --git a/cc/tf/secureops/data/secure_textline_dataset.cc b/cc/tf/secureops/data/secure_textline_dataset.cc new file mode 100644 index 00000000..4ca90027 --- /dev/null +++ b/cc/tf/secureops/data/secure_textline_dataset.cc @@ -0,0 +1,445 @@ +// ============================================================================== +// Copyright 2020 The LatticeX Foundation +// This file is part of the Rosetta library. +// +// The Rosetta library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Rosetta library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Rosetta library. If not, see . +// ============================================================================== +#include "tensorflow/core/common_runtime/metrics.h" +#include "tensorflow/core/framework/dataset.h" +#include "tensorflow/core/framework/partial_tensor_shape.h" +#include "tensorflow/core/framework/tensor.h" +#include "tensorflow/core/lib/io/buffered_inputstream.h" +#include "tensorflow/core/lib/io/inputbuffer.h" +#include "tensorflow/core/lib/io/random_inputstream.h" +#include "tensorflow/core/lib/io/record_reader.h" +#include "tensorflow/core/lib/io/zlib_compression_options.h" +#include "tensorflow/core/lib/io/zlib_inputstream.h" +#include "cc/modules/protocol/public/protocol_manager.h" +#include "cc/modules/common/include/utils/logger.h" +#include +#include +#include +#include "cc/modules/common/include/utils/file_directory.h" + +using namespace rosetta; + +namespace tensorflow { +namespace data { +namespace { + +// See documentation in ../../ops/dataset_ops.cc for a high-level +// description of the following ops. + + +constexpr char kPrivateTextLineDatasetName[] = "PrivateTextLine"; + +// PrivateTextLineDatasetOp is mostly copy from tensorflow.data.TextLineDatasetOp with subtle +// change for reading line and setting up file stream +// the main perpose is to support secure cvs file reading +class PrivateTextLineDatasetOp : public DatasetOpKernel { + public: + using DatasetOpKernel::DatasetOpKernel; + PrivateTextLineDatasetOp(OpKernelConstruction* ctx) : DatasetOpKernel(ctx) { + const NodeDef& def = ctx->def(); + unique_op_name_ = def.name(); + + auto func_def = ctx->function_library()->GetFunctionLibraryDefinition(); + if (func_def) { + std::vector func_name_lists = func_def->ListFunctionNames(); + assert(func_name_lists.size() == 1); + unique_op_name_ = func_name_lists[0] + "/" + def.name(); + log_debug << "PrivateTextLineDataset op unique name_:" << unique_op_name_; + } + } + + void MakeDataset(OpKernelContext* ctx, DatasetBase** output) override { + const Tensor* filenames_tensor; + OP_REQUIRES_OK(ctx, ctx->input("filenames", &filenames_tensor)); + OP_REQUIRES( + ctx, filenames_tensor->dims() <= 1, + errors::InvalidArgument("`filenames` must be a scalar or a vector.")); + + string compression_type; + OP_REQUIRES_OK(ctx, ParseScalarArgument(ctx, "compression_type", + &compression_type)); + + int64 buffer_size = -1; + OP_REQUIRES_OK( + ctx, ParseScalarArgument(ctx, "buffer_size", &buffer_size)); + OP_REQUIRES( + ctx, buffer_size >= 0, + errors::InvalidArgument("`buffer_size` must be >= 0 (0 == default)")); + + io::ZlibCompressionOptions zlib_compression_options = + io::ZlibCompressionOptions::DEFAULT(); + if (compression_type == "ZLIB") { + zlib_compression_options = io::ZlibCompressionOptions::DEFAULT(); + } else if (compression_type == "GZIP") { + zlib_compression_options = io::ZlibCompressionOptions::GZIP(); + } else { + OP_REQUIRES(ctx, compression_type.empty(), + errors::InvalidArgument("Unsupported compression_type.")); + } + + if (buffer_size != 0) { + // Set the override size. + zlib_compression_options.input_buffer_size = buffer_size; + } + + int32 data_owner = 0; + OP_REQUIRES_OK( + ctx, ParseScalarArgument(ctx, "data_owner", &data_owner)); + OP_REQUIRES( + ctx, data_owner >= 0 && data_owner <= 2, + errors::InvalidArgument("`data_owner` in {0,1,2}")); + + std::vector filenames; + filenames.reserve(filenames_tensor->NumElements()); + for (int i = 0; i < filenames_tensor->NumElements(); ++i) { + filenames.push_back(filenames_tensor->flat()(i)); + } + + *output = new Dataset(ctx, std::move(filenames), compression_type, + zlib_compression_options, data_owner, unique_op_name_); + } + + private: + class Dataset : public DatasetBase { + public: + Dataset(OpKernelContext* ctx, std::vector filenames, + const string& compression_type, + const io::ZlibCompressionOptions& options, int data_owner, string unique_op_name) + : DatasetBase(DatasetContext(ctx)), + filenames_(std::move(filenames)), + compression_type_(compression_type), + use_compression_(!compression_type.empty()), + data_owner_(data_owner), + unique_op_name_(unique_op_name), + options_(options) {} + + std::unique_ptr MakeIteratorInternal( + const string& prefix) const override { + return absl::make_unique(Iterator::Params{ + this, strings::StrCat(prefix, "::", kPrivateTextLineDatasetName)}, data_owner_); + } + + const DataTypeVector& output_dtypes() const override { + static DataTypeVector* dtypes = new DataTypeVector({DT_STRING}); + return *dtypes; + } + + const std::vector& output_shapes() const override { + static std::vector* shapes = + new std::vector({{}}); + return *shapes; + } + + string DebugString() const override { return "PrivateTextLineDatasetOp::Dataset"; } + + protected: + Status AsGraphDefInternal(SerializationContext* ctx, + DatasetGraphDefBuilder* b, + Node** output) const override { + Node* filenames = nullptr; + Node* compression_type = nullptr; + Node* buffer_size = nullptr; + TF_RETURN_IF_ERROR(b->AddVector(filenames_, &filenames)); + TF_RETURN_IF_ERROR(b->AddScalar(compression_type_, &compression_type)); + TF_RETURN_IF_ERROR( + b->AddScalar(options_.input_buffer_size, &buffer_size)); + TF_RETURN_IF_ERROR(b->AddDataset( + this, {filenames, compression_type, buffer_size}, output)); + return Status::OK(); + } + + private: + class Iterator : public DatasetIterator { + protected: + struct DataFileInfo { + int fields; + int lines; + char delim; + bool with_header; + DataFileInfo() : fields(0), lines(0), delim(','), with_header(false) {} + }; + + public: + explicit Iterator(const Params& params, int data_owner) + : data_owner_(data_owner), DatasetIterator(params) {} + + Status GetNextInternal(IteratorContext* ctx, + std::vector* out_tensors, + bool* end_of_sequence) override { + mutex_lock l(mu_); + do { + // We are currently processing a file, so try to read the next line. + // TODO: [kelvin] use fake line with zeros + // kelvin will custom buffer_input_stream and setup the stream in the beginning + if (is_setup_) { + string line_contents; + // Status s = buffered_input_stream_->ReadLine(&line_contents); + Status s = ReadNextLine(line_contents); + + if (s.ok()) { + // Produce the line as output. + metrics::RecordTFDataBytesRead(kPrivateTextLineDatasetName, + line_contents.size()); + out_tensors->emplace_back(ctx->allocator({}), DT_STRING, + TensorShape({})); + out_tensors->back().scalar()() = std::move(line_contents); + *end_of_sequence = false; + return Status::OK(); + } else if (!errors::IsOutOfRange(s)) { + // Report non-EOF errors to the caller. + log_error << "got non-EOF error !"; + return s; + } + // We have reached the end of the current file, so maybe + // move on to next file. + ResetStreamsLocked(); + ++current_file_index_; + } + + // Iteration ends when there are no more files to process. + if (current_file_index_ == dataset()->filenames_.size()) { + *end_of_sequence = true; + return Status::OK(); + } + + TF_RETURN_IF_ERROR(SetupStreamsLocked(ctx->env())); + } while (true); + } + + protected: + std::shared_ptr CreateNode( + IteratorContext* ctx, model::Node::Args args) const override { + return model::MakeSourceNode(std::move(args)); + } + + Status SaveInternal(IteratorStateWriter* writer) override { + mutex_lock l(mu_); + TF_RETURN_IF_ERROR(writer->WriteScalar(full_name("current_file_index"), + current_file_index_)); + + // `buffered_input_stream_` is empty if + // 1. GetNext has not been called even once. + // 2. All files have been read and iterator has been exhausted. + if (buffered_input_stream_) { + TF_RETURN_IF_ERROR(writer->WriteScalar( + full_name("current_pos"), buffered_input_stream_->Tell())); + } + return Status::OK(); + } + + Status RestoreInternal(IteratorContext* ctx, + IteratorStateReader* reader) override { + mutex_lock l(mu_); + ResetStreamsLocked(); + int64 current_file_index; + TF_RETURN_IF_ERROR(reader->ReadScalar(full_name("current_file_index"), + ¤t_file_index)); + current_file_index_ = size_t(current_file_index); + // The key "current_pos" is written only if the iterator was saved + // with an open file. + if (reader->Contains(full_name("current_pos"))) { + int64 current_pos; + TF_RETURN_IF_ERROR( + reader->ReadScalar(full_name("current_pos"), ¤t_pos)); + + TF_RETURN_IF_ERROR(SetupStreamsLocked(ctx->env())); + TF_RETURN_IF_ERROR(buffered_input_stream_->Seek(current_pos)); + } + return Status::OK(); + } + + private: + // Sets up reader streams to read from the file at `current_file_index_`. + Status SetupStreamsLocked(Env* env) EXCLUSIVE_LOCKS_REQUIRED(mu_) { + if (current_file_index_ >= dataset()->filenames_.size()) { + return errors::InvalidArgument( + "current_file_index_:", current_file_index_, + " >= filenames_.size():", dataset()->filenames_.size()); + } + + if (IsDataOwner()) { + // Actually move on to next file. + TF_RETURN_IF_ERROR( + env->NewRandomAccessFile(dataset()->filenames_[current_file_index_], &file_)); + input_stream_ = absl::make_unique(file_.get(), false); + + if (dataset()->use_compression_) { + zlib_input_stream_ = absl::make_unique( + input_stream_.get(), dataset()->options_.input_buffer_size, + dataset()->options_.input_buffer_size, dataset()->options_); + buffered_input_stream_ = absl::make_unique( + zlib_input_stream_.get(), dataset()->options_.input_buffer_size, false); + } else { + buffered_input_stream_ = absl::make_unique( + input_stream_.get(), dataset()->options_.input_buffer_size, false); + } + + log_debug << "data owner: " << ProtocolManager::Instance()->GetProtocol()->GetPartyId() << " and setup stream"; + } else { + log_debug << "not data owner: " << ProtocolManager::Instance()->GetProtocol()->GetPartyId() << " with setup fake stream"; + } + + // [kelvin] Get data file info, eg. fields, lines, delmiter + if (ExchangeDataFileInfo() != 0) { + return errors::InvalidArgument( + "current_file_index_:", current_file_index_, + " Exchange data file description info failed"); + } + + is_setup_ = true; + return Status::OK(); + } + + Status ReadNextLine(string& line_contents) { + if (!is_setup_) { + log_error << "not setup stream to iterator!"; + return Status(error::FAILED_PRECONDITION, "not setup stream"); + } + + Status s = Status::OK(); + line_contents.clear(); + if (IsDataOwner() && buffered_input_stream_) { + bool is_blank = true; + do { + s = buffered_input_stream_->ReadLine(&line_contents); + if (errors::IsOutOfRange(s) || current_file_line_count_ == data_file_info_.lines) + return Status(error::OUT_OF_RANGE, "text file out of range"); + + if (line_contents.find_first_not_of(' ') != std::string::npos)// not space line + { + current_file_line_count_++; + break; + } + } while(is_blank); + + // log_debug << "owner read file line count:" << current_file_line_count_ << ", contents sizes:" << line_contents.size(); + } + else { + // make a fake line + if (current_file_line_count_ == data_file_info_.lines) + return Status(error::OUT_OF_RANGE, "text file out of range"); + + line_contents.resize(data_file_info_.fields*2-1); + size_t i = 0; + for ( ; i < data_file_info_.fields-1; ++i) { + line_contents[2*i] = '0'; + line_contents[2*i+1] = data_file_info_.delim; + } + + if (i == data_file_info_.fields-1) + { + line_contents[data_file_info_.fields*2-2] = '0'; + ++current_file_line_count_; + } + + // log_debug << "no-owner read file line count:" << current_file_line_count_ << ", contents sizes:" << line_contents.size(); + } + + return s; + } + + int ExchangeDataFileInfo() { + int party_id = ProtocolManager::Instance()->GetProtocol()->GetPartyId(); + log_debug << "to exchange data file info... party_id: " << party_id; + + // get unique msg key + std::stringstream msg_key; + msg_key << "/SecureTextDataset/" << current_file_index_ << "/" << dataset()->unique_op_name_; + log_debug << "SecureTextDataset op msg key:" << msg_key.str() << endl; + + // assemble data file info + string result, msg; + if (IsDataOwner()) { + DataFileInfo file_info; + string filename = dataset()->filenames_[current_file_index_]; + get_file_lines_fields(filename, ',', file_info.lines, file_info.fields); + file_info.with_header = false; + file_info.delim = ','; + msg.append((char*)&file_info, sizeof(file_info)); + log_debug << "get_file_lines_fields: file lines=" << file_info.lines << ", file fields=" << file_info.fields << endl; + } + else { + msg.resize(sizeof(data_file_info_)); + } + + // dataowner send file info to peers and non-dataowner recv file info to peers + if (0 != ProtocolManager::Instance()->GetProtocol()->GetOps(msg_key.str())->Broadcast(data_owner_, msg, result)) + { + log_error << "call Broadcast failed, party: " << party_id; + return -1; + } + + // save file info + if (IsDataOwner()) + memcpy(&data_file_info_, msg.data(), sizeof(data_file_info_)); + else + memcpy(&data_file_info_, result.data(), sizeof(data_file_info_)); + + log_info << "data file info: " << "lines: "<< data_file_info_.lines << ", fields: " << data_file_info_.fields << ", !!! we only support textline for CSV !!! "; + return 0; + } + + bool IsDataOwner() const { + return data_owner_ == ProtocolManager::Instance()->GetProtocol()->GetPartyId(); + } + + // Resets all reader streams. + void ResetStreamsLocked() EXCLUSIVE_LOCKS_REQUIRED(mu_) { + if (IsDataOwner() && input_stream_) { + input_stream_.reset(); + zlib_input_stream_.reset(); + buffered_input_stream_.reset(); + file_.reset(); + } + is_setup_ = false; + } + + private: + mutex mu_; + std::unique_ptr input_stream_ + GUARDED_BY(mu_); + std::unique_ptr zlib_input_stream_ GUARDED_BY(mu_); + std::unique_ptr buffered_input_stream_ + GUARDED_BY(mu_); + size_t current_file_index_ GUARDED_BY(mu_) = 0; + size_t current_file_line_count_ GUARDED_BY(mu_) = 0; + DataFileInfo data_file_info_; + std::unique_ptr file_ + GUARDED_BY(mu_); // must outlive input_stream_ + int data_owner_; + bool is_setup_ GUARDED_BY(mu_) = false; + };//Dataset::Iterator + + const std::vector filenames_; + const string compression_type_; + const bool use_compression_; + const io::ZlibCompressionOptions options_; + const int data_owner_; + string unique_op_name_; + };//Dataset + + string unique_op_name_; +};//PrivateTextLineDatasetOp + +REGISTER_KERNEL_BUILDER(Name("PrivateTextLineDataset").Device(DEVICE_CPU), + PrivateTextLineDatasetOp); + +} +} +} + diff --git a/cc/tf/secureops/secure_base_kernel.h b/cc/tf/secureops/secure_base_kernel.h index d8cfa0dd..c9f5d325 100644 --- a/cc/tf/secureops/secure_base_kernel.h +++ b/cc/tf/secureops/secure_base_kernel.h @@ -31,6 +31,7 @@ using namespace std; #include "tensorflow/core/framework/types.h" #include "tensorflow/core/framework/op.h" #include "tensorflow/core/framework/op_kernel.h" +#include "tensorflow/core/framework/function.h" #include "tensorflow/core/framework/common_shape_fns.h" #include "tensorflow/core/framework/tensor_shape.h" #include "tensorflow/core/framework/kernel_def.pb.h" @@ -104,6 +105,23 @@ class SecureOpKernel : public OpKernel { msg_id_ = msg_id_t(def.name()); log_debug << "SecureOpKernel msgid:" << msg_id(); + + //----------------------------------------------- + //Deal with PrivateInput op in decode function + auto func_lib = context->function_library(); + if (func_lib) { + auto func_def = func_lib->GetFunctionLibraryDefinition(); + if (func_def) { + std::vector func_name_lists = func_def->ListFunctionNames(); + if (func_name_lists.size() == 1 && !strcmp(def.name().c_str(), "PrivateInput")) { + string op_unique_name = func_name_lists[0] + "/" + def.name(); + msg_id_ = msg_id_t(op_unique_name); + log_debug << "New SecureOpKernel msgid:" << msg_id(); + } + } + } + //----------------------------------------------- + } void Compute(OpKernelContext* context) override { DEBUG_PRINT_BEFORE(context); diff --git a/cc/tf/secureops/secure_convert.cc b/cc/tf/secureops/secure_convert.cc index c0bdbeb8..9210929e 100755 --- a/cc/tf/secureops/secure_convert.cc +++ b/cc/tf/secureops/secure_convert.cc @@ -210,6 +210,96 @@ class SecureToTfOp : public SecureOpKernel { } }; +template +class PrivateInputOp : public SecureOpKernel { + public: + explicit PrivateInputOp(OpKernelConstruction* ctx) : SecureOpKernel(ctx) { + OP_REQUIRES_OK(ctx, ctx->GetAttr("data_owner_", &data_owner_)); + log_debug << "construct private input T op, data_owner_: " << data_owner_ << endl; + } + + void Compute(OpKernelContext* context) override { + log_debug << "private_input OpKernel compute ..." << endl; + const Tensor *input_tensor, *data_owner; + OP_REQUIRES_OK(context, + context->input("input", &input_tensor)); + OP_REQUIRES_OK(context, + context->input("data_owner", &data_owner)); + Tensor* output_tensor = nullptr; + OP_REQUIRES_OK(context, + context->allocate_output("output", input_tensor->shape(), &output_tensor)); + auto output_flat = output_tensor->flat(); + + const auto& input_flat = input_tensor->flat(); + vector inputs(input_flat.size()); + for (int i = 0; i < input_flat.size(); ++i) { + inputs[i] = double(input_flat(i)); + } + + const auto& data_owner_flat = data_owner->flat(); + data_owner_ = data_owner_flat(0); + + // PrivateInput input + vector outputs(input_flat.size()); + ProtocolManager::Instance()->GetProtocol()->GetOps(msg_id().str())->PrivateInput(data_owner_, inputs, outputs); + + for (int i = 0; i < input_flat.size(); ++i) { + output_flat(i) = outputs[i]; + } + + log_debug << "run PrivateInput op ok." << endl; + } + + private: + int data_owner_; +}; + +template <> +class PrivateInputOp : public SecureOpKernel { + public: + explicit PrivateInputOp(OpKernelConstruction* ctx) : SecureOpKernel(ctx) { + // OP_REQUIRES_OK(ctx, ctx->GetAttr("data_owner_", &data_owner_)); + // log_debug << "construct private input string op, data_owner_: " << data_owner_ << endl; + } + + void Compute(OpKernelContext* context) override { + log_debug << "private_input OpKernel compute ..." << endl; + const Tensor *input_tensor, *data_owner; + OP_REQUIRES_OK(context, + context->input("input", &input_tensor)); + OP_REQUIRES_OK(context, + context->input("data_owner", &data_owner)); + + Tensor* output_tensor = nullptr; + OP_REQUIRES_OK(context, + context->allocate_output("output", input_tensor->shape(), &output_tensor)); + auto output_flat = output_tensor->flat(); + + const auto& input_flat = input_tensor->flat(); + vector inputs(input_flat.size()); + for (int i = 0; i < input_flat.size(); ++i) { + inputs[i] = std::stod(input_flat(i)); + } + + const auto& data_owner_flat = data_owner->flat(); + data_owner_ = data_owner_flat(0); + + // PrivateInput input + vector outputs(input_flat.size()); + ProtocolManager::Instance()->GetProtocol()->GetOps(msg_id().str())->PrivateInput(data_owner_, inputs, outputs); + + for (int i = 0; i < input_flat.size(); ++i) { + output_flat(i) = outputs[i]; + } + + log_debug << "run PrivateInput op ok." << endl; + } + + private: + int data_owner_; +}; + + // Registers the currently supported output types. #define REGISTER(type) \ REGISTER_KERNEL_BUILDER( \ @@ -221,6 +311,7 @@ REGISTER(int64); REGISTER(string); #undef REGISTER +// tf_to_secure kernel REGISTER_KERNEL_BUILDER( Name("TfToSecure").Device(DEVICE_CPU).TypeConstraint("dtype"), TfToSecureOp); REGISTER_KERNEL_BUILDER( @@ -230,9 +321,22 @@ REGISTER_KERNEL_BUILDER( REGISTER_KERNEL_BUILDER( Name("TfToSecure").Device(DEVICE_CPU).TypeConstraint("dtype"), TfToSecureOp); -// REGISTER_KERNEL_BUILDER( -// Name("SecureToTf").Device(DEVICE_CPU).TypeConstraint("dtype"), SecureToTfOp); -// REGISTER_KERNEL_BUILDER( -// Name("SecureToTf").Device(DEVICE_CPU).TypeConstraint("dtype"), SecureToTfOp); +// private_input kernel +REGISTER_KERNEL_BUILDER( + Name("PrivateInput")\ + .Device(DEVICE_CPU)\ + .TypeConstraint("dtype"), PrivateInputOp); +REGISTER_KERNEL_BUILDER( + Name("PrivateInput")\ + .Device(DEVICE_CPU)\ + .TypeConstraint("dtype"), PrivateInputOp); +REGISTER_KERNEL_BUILDER( + Name("PrivateInput")\ + .Device(DEVICE_CPU)\ + .TypeConstraint("dtype"), PrivateInputOp); +REGISTER_KERNEL_BUILDER( + Name("PrivateInput")\ + .Device(DEVICE_CPU)\ + .TypeConstraint("dtype"), PrivateInputOp); } \ No newline at end of file diff --git a/cc/tf/secureops/secure_convert_ops.cc b/cc/tf/secureops/secure_convert_ops.cc index 5a6ca556..cba5bd3a 100755 --- a/cc/tf/secureops/secure_convert_ops.cc +++ b/cc/tf/secureops/secure_convert_ops.cc @@ -35,3 +35,10 @@ REGISTER_OP("SecureToTf") .SetIsStateful(); //.SetShapeFn(::tensorflow::shape_inference::UnchangedShape); +REGISTER_OP("PrivateInput") + .Attr("dtype: {int32, int64, float, double, string}") + .Attr("T: {int32, int64}") + .Input("input: dtype") + .Input("data_owner: T") + .Output("output: string"); + diff --git a/cc/tf/secureops/tests/secure_op_test.py b/cc/tf/secureops/tests/secure_op_test.py index a1038e12..5eaf90eb 100644 --- a/cc/tf/secureops/tests/secure_op_test.py +++ b/cc/tf/secureops/tests/secure_op_test.py @@ -26,7 +26,7 @@ def __getattr__(self, attr): sys.stdout = ZeroBufferOut(sys.stdout) -basepath = os.path.abspath(os.path.dirname(__file__)) + "/../../../../build128/lib" +basepath = os.path.abspath(os.path.dirname(__file__)) + "/../../../../build/lib" rosettapth = ( os.path.abspath(os.path.dirname(__file__)) + "/../../../../build/lib.linux-x86_64-3.6/latticex" @@ -135,6 +135,61 @@ def test_reveal(): print("----- test_reveal (OK) -----") +def test_private_input(): + print("----- test_private_input -----") + #in1 = [[1,2,3], [4,5,6]] + in1 = [["1","2","3"], ["4","5","6"]] + v11 = rtt.private_input(in1, data_owner=0) + v12 = rtt.private_input(in1, data_owner=0) + v13 = rtt.private_input(in1, data_owner=0) + + v21 = rtt.private_input(in1, data_owner=1) + v22 = rtt.private_input(in1, data_owner=1) + v23 = rtt.private_input(in1, data_owner=1) + + v31 = rtt.private_input(in1, data_owner=2) + v32 = rtt.private_input(in1, data_owner=2) + v33 = rtt.private_input(in1, data_owner=2) + + ret11 = rtt.secure_reveal(v11, 7) + ret12 = rtt.secure_reveal(v12, 7) + ret13 = rtt.secure_reveal(v13, 7) + ret21 = rtt.secure_reveal(v21, 7) + ret22 = rtt.secure_reveal(v22, 7) + ret23 = rtt.secure_reveal(v23, 7) + ret31 = rtt.secure_reveal(v31, 7) + ret32 = rtt.secure_reveal(v32, 7) + ret33 = rtt.secure_reveal(v33, 7) + result1 = create_run_session(ret11) # , ret12, ret13, ret21, ret22, ret23, ret31, ret32, ret33]) # , ret2, ret3, ret4, ret5, ret6, ret7, ret8, ret9 + print("reveal result1: ", result1) + + result2 = create_run_session(ret12) + print("reveal result2: ", result2) + + result3 = create_run_session(ret13) + print("reveal result4: ", result3) + + result4 = create_run_session(ret21) + print("reveal result3: ", result4) + + result5 = create_run_session(ret22) + print("reveal result5: ", result5) + + result6 = create_run_session(ret23) + print("reveal result6: ", result6) + + result7 = create_run_session(ret31) + print("reveal result7: ", result7) + + result8 = create_run_session(ret32) + print("reveal result8: ", result8) + + result9 = create_run_session(ret33) + print("reveal result9: ", result9) + + print("!! expect all to be: [[1,2,3],[4,5,6]] !!") + print("----- test_private_input (OK) -----") + def test_add(): print("----- test_add -----") assert rtt @@ -566,10 +621,11 @@ def test_negative(): def test_all_protocol_ops(): - test_reveal() # pass + test_private_input() + # test_reveal() # pass # test_add() # pass # test_sub() # pass - test_mul() # pass + # test_mul() # pass # test_matmul() # pass # test_div() # pass # test_div_const() # pass @@ -584,8 +640,8 @@ def test_all_protocol_ops(): # test_pow2() # pass ######################### - test_log() # not pass for snn - test_log1p() # not pass for snn + # test_log() # not pass for snn + # test_log1p() # not pass for snn ######################### # test_square() # pass @@ -617,8 +673,8 @@ def test_all_protocol_ops(): test_all_protocol_ops() print("--- {} ops test ok ----".format(protocol)) - protocol = "Helix" - print("--- to activate ", protocol) - prot_handler.activate(protocol, cfg_content) - test_all_protocol_ops() + # protocol = "Helix" + # print("--- to activate ", protocol) + # prot_handler.activate(protocol, cfg_content) + # test_all_protocol_ops() print("--- {} ops test ok ----".format(protocol)) diff --git a/cc/tf/secureops/tests/secure_opname_test.py b/cc/tf/secureops/tests/secure_opname_test.py index ebecb711..1099eb91 100644 --- a/cc/tf/secureops/tests/secure_opname_test.py +++ b/cc/tf/secureops/tests/secure_opname_test.py @@ -16,7 +16,7 @@ "secure_div", "secure_truediv", "secure_realdiv", "secure_floordiv", "secure_reduce_sum", "secure_reduce_mean", "secure_reduce_max","secure_reduce_min", "secure_negative", "secure_apply_gradient_descent", - "secure_save_v2", "secure_to_tf", "tf_to_secure", + "secure_save_v2", "secure_to_tf", "tf_to_secure", "private_input", "secure_less", "secure_less_equal", "secure_not_equal", "secure_equal", "secure_greater", "secure_greater_equal", "secure_sigmoid", "secure_relu", "secure_sigmoid_cross_entropy" ] diff --git a/clean_pip.sh b/clean_pip.sh index f5ff3609..1add2ead 100755 --- a/clean_pip.sh +++ b/clean_pip.sh @@ -1,5 +1,5 @@ rm python/latticex_rosetta.egg-info -rf -rm python/latticex/lib*.so +rm python/latticex/lib*.so -rf rm python/latticex/lib128 -rf rm dist -rf diff --git a/doc/API_DOC.md b/doc/API_DOC.md index 64664b52..9f817d15 100644 --- a/doc/API_DOC.md +++ b/doc/API_DOC.md @@ -15,6 +15,7 @@ - [`private_input(party_id: int, input_val)`](#private_inputparty_id-int-input_val) - [`private_console_input(party_id: int)`](#private_console_inputparty_id-int) - [`class PrivateDataSet`](#class-privatedataset) + - [`class PrivateTextLineDataset`](#class-privatetextlinedataset) - [Operation API](#operation-api) - [Terms and definition](#terms-and-definition) - [Common notes](#common-notes) @@ -51,6 +52,7 @@ - [`SecureReveal(a, reveal_party=7)`](#securereveala-reveal_party7) - [I/O SecureOps](#io-secureops) - [`SecureSaveV2(prefix, tensor_names, shape_and_slices, tensors, name=None)`](#securesavev2prefix-tensor_names-shape_and_slices-tensors-namenone) + - [`PrivateInput(x, data_owner, name=None)`](#privateinputx-data_owner-namenone) ## Overview @@ -179,6 +181,26 @@ The APIs can mainly be classified into two types: 'Control API', which should be The combination of the above two functions. +#### `class PrivateTextLineDataset` + + A private Dataset comprising lines from one or more text files. + + It is used in the same way as TextLineDataset, but has one more parameter `data_owner` than TextLineDataset, which represents which party holds the private data. + + For example, assuming P0 owns private data, it instantiates like this: + ```python + file_x = ... + dataset_x0 = rtt.PrivateTextLineDataset( + file_x, data_owner=0) # P0 hold the file data + ``` + + For example, assuming P1 owns private data, it instantiates like this: + ```python + file_x = ... + dataset_x1 = rtt.PrivateTextLineDataset( + file_x, data_owner=1) # P1 hold the file data + ``` + ## Operation API @@ -799,4 +821,16 @@ We will try to represent each `SecureOp` interface in an clear and easy-to-under *NOTE*: Every party must have the same configured value so that the system call perform the correct actions. **This Configuration is important due to its output values, which may be sensitive, are in plaintext . So be cautious and reach a consensus among all the parties.** +#### `PrivateInput(x, data_owner, name=None)` + + Define a private input,this represents a `private` input owned by the specified player into the graph. + + **Args:** + - **`x`**: The data from data owner, supported data types: int32, int64, float, double, string. + - **`data_owner`**: The private data owner, data_owner = 0 means party P0 held the data, data_owner = 1 means party P1 held the data, data_owner = 2 means party P2 held the data. + - **`name`**: A name for the operation (optional). + + **Returns:** + A `tensor`, Has the same type and shape as `x`. + diff --git a/doc/API_DOC_CN.md b/doc/API_DOC_CN.md index 50f2e6e2..f68abf64 100644 --- a/doc/API_DOC_CN.md +++ b/doc/API_DOC_CN.md @@ -38,6 +38,7 @@ - [`SecureReveal(a, reveal_party=-1)`](#securereveala-reveal_party-1) - [I/O SecureOps](#io-secureops) - [`SecureSaveV2(prefix, tensor_names, shape_and_slices, tensors, name=None)`](#securesavev2prefix-tensor_names-shape_and_slices-tensors-namenone) + - [`PrivateInput(x, data_owner, name=None)`](#privateinputx-data_owner-namenone) ## 概述 @@ -650,3 +651,17 @@ ​ 逻辑图上创建该IO逻辑的算子. *注意*: 每一个参与的计算法都必须要有同样的配置值,系统才可以正确的进行对应的操作。 **因为输出的文件中可以配置为保存明文,这一配置值很重要。所以使用此接口时务必小心,使用此接口前,在参与计算的多方之间对此要达成一致.** + + #### `PrivateInput(x, data_owner, name=None)` + + 定义一个 "私有" 输入, 这个 "私有" 的输入,由指定 `data_owner` 端拥有并输入到计算图中。 + + **参数:** + - **`x`**: 数据拥有方持有的数据,支持的数据类型: `int32`,`int64`,`float`, `double`, `string`。 + - **`data_owner`**: 指定私有数据拥有方,data_owner = 0 代表 P0 方拥有私有数据,data_owner = 1 代表 P1 方拥有私有数据,data_owner = 2 代表 P2 方拥有私有数据。 + - **`name(可选)`**: 指定的该操作的名称,默认为`None`。 + + **返回值:** + 一个 `Tensor`。类型和形状与`x`相同。 + + *注意*: 返回值是一个跟 x 形状和类型一样的共享值。 diff --git a/doc/TUTORIALS.md b/doc/TUTORIALS.md index ec8e4f69..55514661 100644 --- a/doc/TUTORIALS.md +++ b/doc/TUTORIALS.md @@ -17,6 +17,7 @@ - [Model Saving](#model-saving) - [Model Loading and Prediction](#model-loading-and-prediction) - [Logistic Regression](#logistic-regression) + - [Support big data sets](#support-big-data-sets) - [Conclusion](#conclusion) - [Additional Notes](#additional-notes) - [Dataset Description](#dataset-description) @@ -821,6 +822,50 @@ The following figure is about the error comparison between the predicted values ![](./_static/tutorials/logistic_regression_stat-Y-diff4.png) +### Support big data sets +The above linear regression and logistic regression models all load the entire dataset into memory and then take it out in batch order for training, and as the size of the dataset grows, it becomes impractical to load the dataset into memory at once. + +Major plaintext AI frameworks such as TensorFlow are aware of and provide solutions, TensorFlow provides the relevant Dataset APIs to build low-memory consuming, complex, reusable data pipelines, since Rosetta uses Tensorflow as a backend, it can be reused with minor modifications. + +We use logistic regression model as an example to illustrate how to train a model with large datasets. + +For the TensorFlow version complete code, please refer to [tf-ds-lr.py](../example/tutorials/code/tf-ds-lr.py). + +For the Rosetta version complete code, please refer to [rtt-ds-lr.py](../example/tutorials/code/rtt-ds-lr.py). + +Analysis of the code in tf-ds-lr.py and rtt-ds-lr.py reveals two main differences. + +1. Create a text line dataset, use TextLineDataset class in TensorFlow and use PrivateTextLineDataset class in Rosetta. + The code used in TensorFlow is as following: + ```py + dataset_x = tf.data.TextLineDataset(file_x) + ... + ``` + The code used in Rosetta is as following: + ```py + dataset_x0 = rtt.PrivateTextLineDataset( + file_x, data_owner=0) # P0 hold the file data + ... + ``` + +2. Decode functions are implemented differently. TensorFlow version of the decode function split rows to corresponding fields and then converts the fields to floating-point numbers, while the Rosetta version of the decode function also first split rows to corresponding fields and then calls `PrivateInput` function to share the data. + The code used in TensorFlow is as following: + ```py + # dataset decode + def decode_x(line): + fields = tf.string_split([line], ',').values + fields = tf.string_to_number(fields, tf.float64) + return fields + ``` + The code used in Rosetta is as following: + ```py + # dataset decode + def decode_p0(line): + fields = tf.string_split([line], ',').values + fields = rtt.PrivateInput(fields, data_owner=0) # P0 hold the file data + return fields + ``` + ## Conclusion That's all. diff --git a/doc/TUTORIALS_CN.md b/doc/TUTORIALS_CN.md index 1759bc4f..a25af6c4 100644 --- a/doc/TUTORIALS_CN.md +++ b/doc/TUTORIALS_CN.md @@ -1,29 +1,28 @@ - -- [概述](#%e6%a6%82%e8%bf%b0) -- [安装部署](#%e5%ae%89%e8%a3%85%e9%83%a8%e7%bd%b2) -- [快速入门](#%e5%bf%ab%e9%80%9f%e5%85%a5%e9%97%a8) -- [安全多方计算](#%e5%ae%89%e5%85%a8%e5%a4%9a%e6%96%b9%e8%ae%a1%e7%ae%97) - - [百万富翁](#%e7%99%be%e4%b8%87%e5%af%8c%e7%bf%81) - - [tensorflow 版本](#tensorflow-%e7%89%88%e6%9c%ac) - - [rosetta 版本](#rosetta-%e7%89%88%e6%9c%ac) -- [隐私机器学习](#%e9%9a%90%e7%a7%81%e6%9c%ba%e5%99%a8%e5%ad%a6%e4%b9%a0) - - [线性回归](#%e7%ba%bf%e6%80%a7%e5%9b%9e%e5%bd%92) - - [tensorflow 版本](#tensorflow-%e7%89%88%e6%9c%ac-1) - - [rosetta 基础版](#rosetta-%e5%9f%ba%e7%a1%80%e7%89%88) - - [rosetta 版本-Reveal](#rosetta-%e7%89%88%e6%9c%ac-reveal) - - [对比与评估 1](#%e5%af%b9%e6%af%94%e4%b8%8e%e8%af%84%e4%bc%b0-1) - - [对比与评估 2](#%e5%af%b9%e6%af%94%e4%b8%8e%e8%af%84%e4%bc%b0-2) - - [模型保存](#%e6%a8%a1%e5%9e%8b%e4%bf%9d%e5%ad%98) - - [模型加载与预测](#%e6%a8%a1%e5%9e%8b%e5%8a%a0%e8%bd%bd%e4%b8%8e%e9%a2%84%e6%b5%8b) - - [逻辑回归](#%e9%80%bb%e8%be%91%e5%9b%9e%e5%bd%92) -- [结语](#%e7%bb%93%e8%af%ad) -- [附加](#%e9%99%84%e5%8a%a0) - - [数据集说明](#%e6%95%b0%e6%8d%ae%e9%9b%86%e8%af%b4%e6%98%8e) +- [概述](#概述) +- [安装部署](#安装部署) +- [快速入门](#快速入门) +- [安全多方计算](#安全多方计算) + - [百万富翁](#百万富翁) + - [tensorflow 版本](#tensorflow-版本) + - [rosetta 版本](#rosetta-版本) +- [隐私机器学习](#隐私机器学习) + - [线性回归](#线性回归) + - [tensorflow 版本](#tensorflow-版本-1) + - [rosetta 基础版](#rosetta-基础版) + - [rosetta 版本-Reveal](#rosetta-版本-reveal) + - [对比与评估 1](#对比与评估-1) + - [对比与评估 2](#对比与评估-2) + - [模型保存](#模型保存) + - [模型加载与预测](#模型加载与预测) + - [逻辑回归](#逻辑回归) + - [支持超大数据集](#支持超大数据集) +- [结语](#结语) +- [附加](#附加) + - [数据集说明](#数据集说明) ## 概述 - ## 安装部署 如果你还没有搭建 `rosetta` 环境,请参考[部署文档](./DEPLOYMENT_CN.md)。 @@ -838,6 +837,50 @@ rosetta: ![](./_static/tutorials/logistic_regression_stat-Y-diff4.png) +### 支持超大数据集 + +以上的线性回归、逻辑回归模型都是把数据集全部加载到内存中,然后依次按批量取出来进行训练,随着数据集规模越来越大,一次性把数据集加载到内存已经变的不现实。 + +TensorFlow 等主流明文 AI 框架已经意识并提供解决方案,TensorFlow 中提供相关的 Dataset APIs 来构建低内存消耗的、复杂的、可复用的数据管道,由于 Rosetta 使用 TensorFlow 作为后端,因此稍微修改即可复用。 + +我们使用逻辑回归模型作为例子来说明如何使用大数据集进行训练。 + +TensorFlow 完整代码参考 [tf-ds-lr.py](../example/tutorials/code/tf-ds-lr.py) 。 + +Rosetta 完整代码参考 [rtt-ds-lr.py](../example/tutorials/code/rtt-ds-lr.py)。 + +仔细分析 tf-ds-lr.py 和 rtt-ds-lr.py 中的代码,主要有两个不同点: +1. 创建文本行数据集,TensorFlow 中使用 TextLineDataset 类,而 Rosetta 中使用 PrivateTextLineDataset 类。 + TensorFlow 中代码如下: + ```py + dataset_x = tf.data.TextLineDataset(file_x) + ... + ``` + Rosetta 中代码如下: + ```py + dataset_x0 = rtt.PrivateTextLineDataset( + file_x, data_owner=0) # P0 hold the file data + ... + ``` + +2. Decode 函数实现不一样,TensorFlow 版本中 Decode 函数中把行筛分为对应的字段后,然后把筛分后的字段转换为数值,而 Rosetta 版本中的 Decode 函数首先也是把行筛分为对应的字段后,然后调用 `PrivateInput` 进行数据分享。 + TensorFlow 中代码如下: + ```py + # dataset decode + def decode_x(line): + fields = tf.string_split([line], ',').values + fields = tf.string_to_number(fields, tf.float64) + return fields + ``` + Rosetta 中代码如下: + ```py + # dataset decode + def decode_p0(line): + fields = tf.string_split([line], ',').values + fields = rtt.PrivateInput(fields, data_owner=0) # P0 hold the file data + return fields + ``` + ## 结语 OK,你现在已经完全掌握了 `Rosetta` 的使用了,赶紧找一个真实场景玩玩。 diff --git a/example/tutorials/code/rtt-ds-lr.py b/example/tutorials/code/rtt-ds-lr.py new file mode 100644 index 00000000..f5e88b4c --- /dev/null +++ b/example/tutorials/code/rtt-ds-lr.py @@ -0,0 +1,127 @@ +# rosetta LR with sample based (horizonal federated learning) +import latticex.rosetta as rtt # difference from tensorflow +import math +import os +import sys +import csv +import tensorflow as tf +import numpy as np +import pandas as pd +import time +import argparse + +rtt.activate("SecureNN") +mpc_player_id = rtt.py_protocol_handler.get_party_id() +print("mpc_player_id:", mpc_player_id) + +np.set_printoptions(suppress=True) +np.random.seed(0) + +EPOCHES = 1 +BATCH_SIZE = 32 +learning_rate = 0.03125 +DIM_NUM = 11 +ROW_NUM = 1279 + +file_x = "" +file_y = "" +filex_name = "cls_train_x.csv" +filey_name = "cls_train_y.csv" + +file_x = "../dsets/P" + str(mpc_player_id) + "/" + filex_name +file_y = "../dsets/P" + str(mpc_player_id) + "/" + filey_name + +print("file_x:", file_x) +print("file_y:", file_y) +print("DIM_NUM:", DIM_NUM) + + +# training dataset +dataset_x0 = rtt.PrivateTextLineDataset( + file_x, data_owner=0) # P0 hold the file_x data +dataset_x1 = rtt.PrivateTextLineDataset( + file_x, data_owner=1) # P1 hold the file_x data +dataset_y = rtt.PrivateTextLineDataset( + file_y, data_owner=0) # P0 hold the file_y data + + +# dataset decode +def decode_p0(line): + fields = tf.string_split([line], ',').values + fields = rtt.PrivateInput(fields, data_owner=0) + return fields + +def decode_p1(line): + fields = tf.string_split([line], ',').values + fields = rtt.PrivateInput(fields, data_owner=1) + return fields + + +# dataset pipeline +dataset_x0 = dataset_x0 \ + .map(decode_p0)\ + .batch(BATCH_SIZE) + +dataset_x1 = dataset_x1 \ + .map(decode_p1)\ + .batch(BATCH_SIZE) + +dataset_y = dataset_y \ + .map(decode_p0)\ + .batch(BATCH_SIZE) + + +# make iterator +iter_x0 = dataset_x0.make_initializable_iterator() +X0 = iter_x0.get_next() + +iter_x1 = dataset_x1.make_initializable_iterator() +X1 = iter_x1.get_next() + +iter_y = dataset_y.make_initializable_iterator() +Y = iter_y.get_next() + +# Join input X of P0 and P1, features splitted dataset +X = tf.concat([X0, X1], axis=1) + + +# initialize W & b +W = tf.Variable(tf.zeros([DIM_NUM, 1], dtype=tf.float64)) +b = tf.Variable(tf.zeros([1], dtype=tf.float64)) + + +# build lr model +pred_Y = tf.sigmoid(tf.matmul(X, W) + b) +dy = pred_Y - Y +dw = tf.matmul(X, dy, transpose_a=True) * (1.0 / BATCH_SIZE) +db = tf.reduce_sum(dy, axis=0) * (1.0 / BATCH_SIZE) +delta_w = dw * learning_rate +delta_b = db * learning_rate +update_w = W - delta_w +update_b = b - delta_b + + +# update variables +assign_update_w = tf.assign(W, update_w) +assign_update_b = tf.assign(b, update_b) + + +# training +init = tf.global_variables_initializer() +with tf.Session() as sess: + # init var & iter + sess.run(init) + sess.run([iter_x0.initializer, iter_x1.initializer, iter_y.initializer]) + + # train + start_time = time.time() + BATCHES = int(ROW_NUM / BATCH_SIZE) + + for e in range(EPOCHES): + for i in range(BATCHES): + sess.run([assign_update_w, assign_update_b]) + + training_use_time = time.time()-start_time + print("training_use_time: {} seconds".format(training_use_time)) + + diff --git a/example/tutorials/code/tf-ds-lr.py b/example/tutorials/code/tf-ds-lr.py new file mode 100644 index 00000000..afa9a87e --- /dev/null +++ b/example/tutorials/code/tf-ds-lr.py @@ -0,0 +1,105 @@ +import math +import os +import sys +import csv +import tensorflow as tf +import numpy as np +import pandas as pd +import time +import argparse + +np.set_printoptions(suppress=True) +np.random.seed(0) + +EPOCHES = 1 +BATCH_SIZE = 32 +learning_rate = 0.03125 +DIM_NUM = 11 +ROW_NUM = 1279 + +file_x = "" +file_y = "" +filex_name = "cls_train_x.csv" +filey_name = "cls_train_y.csv" + +file_x = "../dsets/ALL/" + filex_name +file_y = "../dsets/ALL/" + filey_name + +print("file_x:", file_x) +print("file_y:", file_y) +print("DIM_NUM:", DIM_NUM) + + +# training dataset +dataset_x = tf.data.TextLineDataset(file_x) +dataset_y = tf.data.TextLineDataset(file_y) + + +# dataset decode +def decode_x(line): + fields = tf.string_split([line], ',').values + fields = tf.string_to_number(fields, tf.float64) + return fields + +def decode_y(line): + fields = tf.string_split([line], ',').values + fields = tf.string_to_number(fields, tf.float64) + return fields + + +# dataset pipeline +dataset_x = dataset_x \ + .map(decode_x) \ + .batch(BATCH_SIZE) + +dataset_y = dataset_y \ + .map(decode_y) \ + .batch(BATCH_SIZE) + + +# make iterator +iter_x = dataset_x.make_initializable_iterator() +X = iter_x.get_next() + +iter_y = dataset_y.make_initializable_iterator() +Y = iter_y.get_next() + + +# initialize W & b +W = tf.Variable(tf.zeros([DIM_NUM, 1], dtype=tf.float64)) +b = tf.Variable(tf.zeros([1], dtype=tf.float64)) + + +# build lr model +pred_Y = tf.sigmoid(tf.matmul(X, W) + b) +dy = pred_Y - Y +dw = tf.matmul(X, dy, transpose_a=True) * (1.0 / BATCH_SIZE) +db = tf.reduce_sum(dy, axis=0) * (1.0 / BATCH_SIZE) +delta_w = dw * learning_rate +delta_b = db * learning_rate +update_w = W - delta_w +update_b = b - delta_b + + +# update variables +assign_update_w = tf.assign(W, update_w) +assign_update_b = tf.assign(b, update_b) + + +# training +init = tf.global_variables_initializer() +with tf.Session() as sess: + # init var & iter + sess.run(init) + sess.run([iter_x.initializer, iter_y.initializer]) + + # train + start_time = time.time() + BATCHES = int(ROW_NUM / BATCH_SIZE) + + for e in range(EPOCHES): + for i in range(BATCHES): + sess.run([assign_update_w, assign_update_b]) + + training_use_time = time.time() - start_time + print("training_use_time: {} seconds".format(training_use_time)) diff --git a/example/tutorials/code/tutorials.sh b/example/tutorials/code/tutorials.sh index 8d21b24c..a102dcdd 100755 --- a/example/tutorials/code/tutorials.sh +++ b/example/tutorials/code/tutorials.sh @@ -109,6 +109,10 @@ function run_all() { run_x tf logistic_regression_stat run_x rtt logistic_regression_stat run_x stat logistic_regression_stat logistic + + # dataset pipeline + run_x tf ds-lr + run_x rtt ds-lr } function helper() { diff --git a/python/latticex/rosetta/secure/__init__.py b/python/latticex/rosetta/secure/__init__.py index 7d31ffbc..d01e8dcc 100644 --- a/python/latticex/rosetta/secure/__init__.py +++ b/python/latticex/rosetta/secure/__init__.py @@ -18,6 +18,9 @@ # decorator of secure operators from latticex.rosetta.secure.decorator import * +# dataset ops +from latticex.rosetta.secure.data.ops.readers import * + # ops from latticex.rosetta.secure.ops.nn_util import * from latticex.rosetta.secure.ops.gradients_util import * diff --git a/python/latticex/rosetta/secure/data/__init__.py b/python/latticex/rosetta/secure/data/__init__.py new file mode 100644 index 00000000..125d471c --- /dev/null +++ b/python/latticex/rosetta/secure/data/__init__.py @@ -0,0 +1,20 @@ +# ============================================================================== +# Copyright 2020 The LatticeX Foundation +# This file is part of the Rosetta library. +# +# The Rosetta library is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The Rosetta library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with the Rosetta library. If not, see . +# ==============================================================================" +# import secure dataset ops + +from latticex.rosetta.secure.data.ops.readers import PrivateTextLineDataset diff --git a/python/latticex/rosetta/secure/data/ops/__init__.py b/python/latticex/rosetta/secure/data/ops/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/latticex/rosetta/secure/data/ops/readers.py b/python/latticex/rosetta/secure/data/ops/readers.py new file mode 100644 index 00000000..c257e256 --- /dev/null +++ b/python/latticex/rosetta/secure/data/ops/readers.py @@ -0,0 +1,215 @@ +# Copyright 2017 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Python wrappers for reader Datasets.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from tensorflow.python.compat import compat +from tensorflow.python.data.ops import dataset_ops +from tensorflow.python.data.util import convert +from tensorflow.python.data.util import structure +from tensorflow.python.framework import dtypes +from tensorflow.python.framework import ops +from tensorflow.python.framework import tensor_shape +from tensorflow.python.ops import array_ops +from tensorflow.python.ops import gen_dataset_ops +from tensorflow.python.ops import gen_experimental_dataset_ops as ged_ops +from tensorflow.python.util.tf_export import tf_export + +from latticex.rosetta.secure.decorator.secure_base_ import _secure_ops + +# TODO(b/64974358): Increase default buffer size to 256 MB. +_DEFAULT_READER_BUFFER_SIZE_BYTES = 256 * 1024 # 256 KB + + +def _secure_create_or_validate_filenames_dataset(filenames): + """Creates (or validates) a dataset of filenames. + + Args: + filenames: Either a list or dataset of filenames. If it is a list, it is + convert to a dataset. If it is a dataset, its type and shape is validated. + + Returns: + A dataset of filenames. + """ + if isinstance(filenames, dataset_ops.DatasetV2): + if dataset_ops.get_legacy_output_types(filenames) != dtypes.string: + raise TypeError( + "`filenames` must be a `tf.data.Dataset` of `tf.string` elements.") + if not dataset_ops.get_legacy_output_shapes(filenames).is_compatible_with( + tensor_shape.scalar()): + raise TypeError( + "`filenames` must be a `tf.data.Dataset` of scalar `tf.string` " + "elements.") + else: + filenames = ops.convert_to_tensor(filenames, dtype=dtypes.string) + filenames = array_ops.reshape(filenames, [-1], name="flat_filenames") + filenames = dataset_ops.DatasetV2.from_tensor_slices(filenames) + + return filenames + + +def _secure_create_dataset_reader(dataset_creator, filenames, num_parallel_reads=None): + """Creates a dataset that reads the given files using the given reader. + + Args: + dataset_creator: A function that takes in a single file name and returns a + dataset. + filenames: A `tf.data.Dataset` containing one or more filenames. + num_parallel_reads: The number of parallel reads we should do. + + Returns: + A `Dataset` that reads data from `filenames`. + """ + def read_one_file(filename): + filename = ops.convert_to_tensor(filename, dtypes.string, name="filename") + return dataset_creator(filename) + + if num_parallel_reads is None: + return filenames.flat_map(read_one_file) + else: + raise ValueError( + "not support parallel interleave dataset for PrivateTextLineDataset") + +def _secure_text_line_dataset(filenames, compression_type, buffer_size, data_owner, name=None): + r"""Creates a dataset that emits the lines of one or more text files. + + Args: + filenames: A `Tensor` of type `string`. + A scalar or a vector containing the name(s) of the file(s) to be + read. + compression_type: A `Tensor` of type `string`. + A scalar containing either (i) the empty string (no + compression), (ii) "ZLIB", or (iii) "GZIP". + buffer_size: A `Tensor` of type `int64`. + A scalar containing the number of bytes to buffer. + data_owner: The owner of dataset in MPC, eg. 0: p0, 1: p1, 2: p2. + name: A name for the operation (optional). + + Returns: + A `Tensor` of type `variant`. + """ + # Add nodes to the TensorFlow graph. + return _secure_ops.private_text_line_dataset(filenames=filenames, + compression_type=compression_type, + buffer_size=buffer_size, data_owner=data_owner, name=name) + + +class _PrivateTextLineDataset(dataset_ops.DatasetSource): + """A `Dataset` comprising records from one or more text files.""" + + def __init__(self, filenames, compression_type=None, buffer_size=None, data_owner=None): + """Creates a `_PrivateTextLineDataset`. + + Args: + filenames: A `tf.string` tensor containing one or more filenames. + compression_type: (Optional.) A `tf.string` scalar evaluating to one of + `""` (no compression), `"ZLIB"`, or `"GZIP"`. + buffer_size: (Optional.) A `tf.int64` scalar denoting the number of bytes + to buffer. A value of 0 results in the default buffering values chosen + based on the compression type. + data_owner: The owner of dataset in MPC, eg. 0: p0, 1: p1, 2: p2. + """ + self._filenames = filenames + self._compression_type = convert.optional_param_to_tensor( + "compression_type", + compression_type, + argument_default="", + argument_dtype=dtypes.string) + self._buffer_size = convert.optional_param_to_tensor( + "buffer_size", + buffer_size, + argument_default=_DEFAULT_READER_BUFFER_SIZE_BYTES) + self._data_owner = convert.optional_param_to_tensor( + "data_owner", + data_owner, + argument_default=0) + variant_tensor = _secure_text_line_dataset( + self._filenames, self._compression_type, self._buffer_size, self._data_owner) + super(_PrivateTextLineDataset, self).__init__(variant_tensor) + + @property + def _element_structure(self): + return structure.TensorStructure(dtypes.string, []) + + +# @tf_export("data.TextLineDataset", v1=[]) +class PrivateTextLineDatasetV2(dataset_ops.DatasetSource): + """A `Dataset` comprising lines from one or more text files.""" + + def __init__(self, filenames, compression_type=None, buffer_size=None, data_owner=None, + num_parallel_reads=None): + """Creates a `PrivateTextLineDataset`. + + Args: + filenames: A `tf.string` tensor or `tf.data.Dataset` containing one or + more filenames. + compression_type: (Optional.) A `tf.string` scalar evaluating to one of + `""` (no compression), `"ZLIB"`, or `"GZIP"`. + buffer_size: (Optional.) A `tf.int64` scalar denoting the number of bytes + to buffer. A value of 0 results in the default buffering values chosen + based on the compression type. + data_owner: The owner of dataset in MPC, eg. 0: p0, 1: p1, 2: p2. + num_parallel_reads: (Optional.) A `tf.int64` scalar representing the + number of files to read in parallel. If greater than one, the records of + files read in parallel are outputted in an interleaved order. If your + input pipeline is I/O bottlenecked, consider setting this parameter to a + value greater than one to parallelize the I/O. If `None`, files will be + read sequentially. + """ + filenames = _secure_create_or_validate_filenames_dataset(filenames) + self._filenames = filenames + self._compression_type = compression_type + self._buffer_size = buffer_size + self._data_owner = data_owner + + def creator_fn(filename): + return _PrivateTextLineDataset(filename, compression_type, buffer_size, data_owner) + + self._impl = _secure_create_dataset_reader(creator_fn, filenames) + variant_tensor = self._impl._variant_tensor # pylint: disable=protected-access + + super(PrivateTextLineDatasetV2, self).__init__(variant_tensor) + + @property + def _element_structure(self): + return structure.TensorStructure(dtypes.string, []) + + +# @tf_export(v1=["data.PrivateTextLineDataset"]) +class PrivateTextLineDatasetV1(dataset_ops.DatasetV1Adapter): + """A `Dataset` comprising lines from one or more text files.""" + + def __init__(self, filenames, compression_type=None, buffer_size=None, data_owner=None, + num_parallel_reads=None): + wrapped = PrivateTextLineDatasetV2(filenames, compression_type, buffer_size, data_owner, + num_parallel_reads) + super(PrivateTextLineDatasetV1, self).__init__(wrapped) + __init__.__doc__ = PrivateTextLineDatasetV2.__init__.__doc__ + + @property + def _filenames(self): + return self._dataset._filenames # pylint: disable=protected-access + + @_filenames.setter + def _filenames(self, value): + self._dataset._filenames = value # pylint: disable=protected-access + + + +# TODO(b/119044825): Until all `tf.data` unit tests are converted to V2, keep +# these aliases in place. +PrivateTextLineDataset = PrivateTextLineDatasetV1 diff --git a/python/latticex/rosetta/secure/decorator/__init__.py b/python/latticex/rosetta/secure/decorator/__init__.py index 5aec583e..25d09fbb 100644 --- a/python/latticex/rosetta/secure/decorator/__init__.py +++ b/python/latticex/rosetta/secure/decorator/__init__.py @@ -22,6 +22,7 @@ from latticex.rosetta.secure.decorator.secure_matrix_ops_ import * from latticex.rosetta.secure.decorator.secure_reduce_ops_ import * from latticex.rosetta.secure.decorator.secure_ml_ops_ import * +from latticex.rosetta.secure.decorator.secure_io_ops_ import * diff --git a/python/latticex/rosetta/secure/decorator/secure_io_ops_.py b/python/latticex/rosetta/secure/decorator/secure_io_ops_.py new file mode 100644 index 00000000..a62e6446 --- /dev/null +++ b/python/latticex/rosetta/secure/decorator/secure_io_ops_.py @@ -0,0 +1,35 @@ +# ============================================================================== +# Copyright 2020 The LatticeX Foundation +# This file is part of the Rosetta library. +# +# The Rosetta library is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The Rosetta library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with the Rosetta library. If not, see . +# ==============================================================================" +import tensorflow as tf +from tensorflow.python.ops import array_ops +from tensorflow.python.framework import ops +from latticex.rosetta.secure.decorator.secure_base_ import _secure_ops + + +# ----------------------------- +# Secure arithmetic IO ops +# ----------------------------- + +def PrivateInput(x, data_owner, name=None): + return _secure_ops.private_input(x, data_owner, name=name) + + +# export SecureInput name +SecureInput = PrivateInput + + diff --git a/setup.py b/setup.py index 829e8112..94253648 100644 --- a/setup.py +++ b/setup.py @@ -144,7 +144,7 @@ def build_extensions(self): DOCLINES = __doc__.split('\n') -__version__ = '0.2.1' +__version__ = '0.3.0' root_dir = os.path.dirname(os.path.abspath(__file__)) include_dirs = []