Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-33589 Remove unused IByteInputStream code and rename ISerialInputStream #19590

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/thorhelper/csvsplitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ void CSVSplitter::setFieldRange(const byte * start, const byte * end, unsigned c
lengths[curColumn] = (size32_t)(internalBuffer + internalOffset - data[curColumn]);
}

unsigned CSVSplitter::splitLine(ISerialStream *stream, size32_t maxRowSize)
unsigned CSVSplitter::splitLine(IBufferedSerialInputStream *stream, size32_t maxRowSize)
{
if (stream->eos())
return 0;
Expand Down
4 changes: 2 additions & 2 deletions common/thorhelper/csvsplitter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
* char, while the RFC mentions re-using quotes (""). We implement both.
*/

interface ISerialStream;
interface IBufferedSerialInputStream;
class THORHELPER_API CSVSplitter
{
public:
Expand All @@ -77,7 +77,7 @@ class THORHELPER_API CSVSplitter
void init(unsigned maxColumns, size32_t maxCsvSize, const char *quotes, const char *separators, const char *terminators, const char *escapes, bool preserveWhitespace);
void reset();
size32_t splitLine(size32_t maxLen, const byte * start);
size32_t splitLine(ISerialStream *stream, size32_t maxRowSize);
size32_t splitLine(IBufferedSerialInputStream *stream, size32_t maxRowSize);

inline unsigned * queryLengths() const { return lengths; }
inline const byte * * queryData() const { return data; }
Expand Down
2 changes: 1 addition & 1 deletion common/thorhelper/thorcommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,7 @@ class CRowStreamReader : public CSimpleInterfaceOf<IExtRowStream>
Linked<IMemoryMappedFile> mmfile;
Linked<IOutputRowDeserializer> deserializer;
Linked<IEngineRowAllocator> allocator;
Owned<ISerialStream> strm;
Owned<IBufferedSerialInputStream> strm;
Owned<ISourceRowPrefetcher> prefetcher;
CThorContiguousRowBuffer prefetchBuffer;
unsigned __int64 progress = 0;
Expand Down
2 changes: 1 addition & 1 deletion common/thorhelper/thorpipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class CBufferedReadRowStream : implements IReadRowStream, public CInterface
}

protected:
Owned<ISerialStream> pipeStream;
Owned<IBufferedSerialInputStream> pipeStream;
CThorStreamDeserializerSource rowSource;
IEngineRowAllocator * rowAllocator;
};
Expand Down
8 changes: 4 additions & 4 deletions common/thorhelper/thorread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ class DiskRowReader : public CInterfaceOf<IDiskRowStream>, implements IDiskRowRe
virtual offset_t getLocalOffset();

protected:
Owned<ISerialStream> inputStream;
Owned<IBufferedSerialInputStream> inputStream;
Owned<IFileIO> inputfileio;
CThorContiguousRowBuffer inputBuffer; // more: move to derived classes.
Owned<IEngineRowAllocator> outputAllocator;
Expand Down Expand Up @@ -1317,9 +1317,9 @@ bool MarkupDiskRowReader::checkOpen()
{
class CSimpleStream : public CSimpleInterfaceOf<ISimpleReadStream>
{
Linked<ISerialStream> stream;
Linked<IBufferedSerialInputStream> stream;
public:
CSimpleStream(ISerialStream *_stream) : stream(_stream)
CSimpleStream(IBufferedSerialInputStream *_stream) : stream(_stream)
{
}
// ISimpleReadStream impl.
Expand Down Expand Up @@ -1530,7 +1530,7 @@ class AlternativeDiskRowReader : public CInterfaceOf<IDiskRowReader>

//Specify where the raw binary input for a particular file is coming from, together with its actual format.
//Does this make sense, or should it be passed a filename? an actual format?
//Needs to specify a filename rather than a ISerialStream so that the interface is consistent for local and remote
//Needs to specify a filename rather than a IBufferedSerialInputStream so that the interface is consistent for local and remote
virtual void clearInput() override
{
directReader->clearInput();
Expand Down
2 changes: 1 addition & 1 deletion common/thorhelper/thorread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ interface IDiskRowReader : extends IRowReader

//Specify where the raw binary input for a particular file is coming from, together with its actual format.
//Does this make sense, or should it be passed a filename? an actual format?
//Needs to specify a filename rather than a ISerialStream so that the interface is consistent for local and remote
//Needs to specify a filename rather than a IBufferedSerialInputStream so that the interface is consistent for local and remote
virtual void clearInput() = 0;

//MORE: It may be better to only have the first of these functions and have the other two functions as global functions that wrap this function
Expand Down
4 changes: 2 additions & 2 deletions ecl/hthor/hthor.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -1948,7 +1948,7 @@ class CHThorWorkunitReadActivity : public CHThorSimpleActivityBase

Owned<IOutputRowDeserializer> rowDeserializer;
MemoryBuffer resultBuffer;
Owned<ISerialStream> bufferStream;
Owned<IBufferedSerialInputStream> bufferStream;
CThorStreamDeserializerSource deserializer;

public:
Expand Down Expand Up @@ -2261,7 +2261,7 @@ protected:
IHThorDiskReadBaseArg &helper;
OwnedIFile inputfile;
OwnedIFileIO inputfileio;
Owned<ISerialStream> inputstream;
Owned<IBufferedSerialInputStream> inputstream;
StringAttr tempFileName;
Owned<IDistributedFilePartIterator> dfsParts;
Owned<ILocalOrDistributedFile> ldFile;
Expand Down
12 changes: 6 additions & 6 deletions ecl/hthor/hthorkey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1740,7 +1740,7 @@ class FetchRequest : public CInterface
class IFlatFetchHandlerCallback
{
public:
virtual void processFetch(FetchRequest const * fetch, offset_t pos, ISerialStream *rawStream) = 0;
virtual void processFetch(FetchRequest const * fetch, offset_t pos, IBufferedSerialInputStream *rawStream) = 0;
};

class IXmlFetchHandlerCallback
Expand All @@ -1755,7 +1755,7 @@ class FetchPartHandlerBase
{
protected:
Owned<IFileIO> rawFile;
Owned<ISerialStream> rawStream;
Owned<IBufferedSerialInputStream> rawStream;
offset_t base;
offset_t top;
bool blockcompressed;
Expand Down Expand Up @@ -2395,7 +2395,7 @@ class CHThorFlatFetchActivity : public CHThorFetchActivityBase, public IFlatFetc

virtual bool needsAllocator() const { return true; }

virtual void processFetch(FetchRequest const * fetch, offset_t pos, ISerialStream *rawStream)
virtual void processFetch(FetchRequest const * fetch, offset_t pos, IBufferedSerialInputStream *rawStream)
{
CThorContiguousRowBuffer prefetchSource;
prefetchSource.setStream(rawStream);
Expand Down Expand Up @@ -2542,7 +2542,7 @@ class CHThorCsvFetchActivity : public CHThorFetchActivityBase, public IFlatFetch

virtual bool needsAllocator() const { return true; }

virtual void processFetch(FetchRequest const * fetch, offset_t pos, ISerialStream *rawStream)
virtual void processFetch(FetchRequest const * fetch, offset_t pos, IBufferedSerialInputStream *rawStream)
{
rawStream->reset(pos, UnknownOffset);
CriticalBlock procedure(transformCrit);
Expand Down Expand Up @@ -3330,7 +3330,7 @@ class KeyedJoinFetchRequest : public CInterface
class IKeyedJoinFetchHandlerCallback
{
public:
virtual void processFetch(KeyedJoinFetchRequest const * fetch, offset_t pos, ISerialStream *rawStream) = 0;
virtual void processFetch(KeyedJoinFetchRequest const * fetch, offset_t pos, IBufferedSerialInputStream *rawStream) = 0;
};

class KeyedJoinFetchPartHandler : public FetchPartHandlerBase, public ThreadedPartHandler<KeyedJoinFetchRequest>
Expand Down Expand Up @@ -3588,7 +3588,7 @@ class CHThorKeyedJoinActivity : public CHThorThreadedActivityBase, implements I
return new KeyedJoinFetchPartHandler(*this, part, base, size, handler, threadPool, blockcompressed, encryptionkey, activityId, outputMeta, prefetcher, rowAllocator);
}

virtual void processFetch(KeyedJoinFetchRequest const * fetch, offset_t pos, ISerialStream *rawStream)
virtual void processFetch(KeyedJoinFetchRequest const * fetch, offset_t pos, IBufferedSerialInputStream *rawStream)
{
CThorContiguousRowBuffer prefetchSource;
prefetchSource.setStream(rawStream);
Expand Down
10 changes: 5 additions & 5 deletions fs/dafsclient/rmtfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2242,7 +2242,7 @@ class CRemoteFilteredFileIOBase : public CRemoteBase, implements IRemoteFileIO
IMPLEMENT_IINTERFACE_O_USING(CRemoteBase);

// Really a stream, but life (maybe) easier elsewhere if looks like a file
// Sometime should refactor to be based on ISerialStream instead - or maybe IRowStream.
// Sometime should refactor to be based on IBufferedSerialInputStream instead - or maybe IRowStream.
CRemoteFilteredFileIOBase(SocketEndpoint &ep, const char *filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, unsigned __int64 chooseN)
: CRemoteBase(ep, filename)
{
Expand Down Expand Up @@ -2476,7 +2476,7 @@ class CRemoteFilteredFileIO : public CRemoteFilteredFileIOBase
{
public:
// Really a stream, but life (maybe) easier elsewhere if looks like a file
// Sometime should refactor to be based on ISerialStream instead - or maybe IRowStream.
// Sometime should refactor to be based on IBufferedSerialInputStream instead - or maybe IRowStream.
CRemoteFilteredFileIO(SocketEndpoint &ep, const char *filename, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, bool compressed, bool grouped, unsigned __int64 chooseN)
: CRemoteFilteredFileIOBase(ep, filename, actual, projected, fieldFilters, chooseN)
{
Expand Down Expand Up @@ -2541,7 +2541,7 @@ class CRemoteFilteredKeyIO : public CRemoteFilteredFileIOBase
{
public:
// Really a stream, but life (maybe) easier elsewhere if looks like a file
// Sometime should refactor to be based on ISerialStream instead - or maybe IRowStream.
// Sometime should refactor to be based on IBufferedSerialInputStream instead - or maybe IRowStream.
CRemoteFilteredKeyIO(SocketEndpoint &ep, const char *filename, unsigned crc, IOutputMetaData *actual, IOutputMetaData *projected, const RowFilter &fieldFilters, unsigned __int64 chooseN)
: CRemoteFilteredFileIOBase(ep, filename, actual, projected, fieldFilters, chooseN)
{
Expand All @@ -2554,7 +2554,7 @@ class CRemoteFilteredKeyCountIO : public CRemoteFilteredFileIOBase
{
public:
// Really a stream, but life (maybe) easier elsewhere if looks like a file
// Sometime should refactor to be based on ISerialStream instead - or maybe IRowStream.
// Sometime should refactor to be based on IBufferedSerialInputStream instead - or maybe IRowStream.
CRemoteFilteredKeyCountIO(SocketEndpoint &ep, const char *filename, unsigned crc, IOutputMetaData *actual, const RowFilter &fieldFilters, unsigned __int64 rowLimit)
: CRemoteFilteredFileIOBase(ep, filename, actual, actual, fieldFilters, rowLimit)
{
Expand All @@ -2569,7 +2569,7 @@ class CRemoteKey : public CSimpleInterfaceOf<IIndexLookup>
offset_t pos = 0;
Owned<ISourceRowPrefetcher> prefetcher;
CThorContiguousRowBuffer prefetchBuffer;
Owned<ISerialStream> strm;
Owned<IBufferedSerialInputStream> strm;
bool pending = false;
SocketEndpoint ep;
StringAttr filename;
Expand Down
6 changes: 3 additions & 3 deletions fs/dafsserver/dafsserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1348,7 +1348,7 @@ class CRemoteStreamReadBaseActivity : public CRemoteDiskBaseActivity, implements
typedef CRemoteDiskBaseActivity PARENT;

protected:
Owned<ISerialStream> inputStream;
Owned<IBufferedSerialInputStream> inputStream;
Owned<IFileIO> iFileIO;
unsigned __int64 chooseN = 0;
unsigned __int64 startPos = 0;
Expand Down Expand Up @@ -1964,9 +1964,9 @@ class CRemoteMarkupReadActivity : public CRemoteExternalFormatReadActivity, impl

class CSimpleStream : public CSimpleInterfaceOf<ISimpleReadStream>
{
Linked<ISerialStream> stream;
Linked<IBufferedSerialInputStream> stream;
public:
CSimpleStream(ISerialStream *_stream) : stream(_stream)
CSimpleStream(IBufferedSerialInputStream *_stream) : stream(_stream)
{
}
// ISimpleReadStream impl.
Expand Down
4 changes: 2 additions & 2 deletions fs/dafsstream/dafsstream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ class CDaFileSrvClientBase : public CInterfaceOf<IDFUFilePartBase>
}
};

class CDFUPartReader : public CDaFileSrvClientBase, implements IDFUFilePartReader, implements ISerialStream
class CDFUPartReader : public CDaFileSrvClientBase, implements IDFUFilePartReader, implements IBufferedSerialInputStream
{
typedef CDaFileSrvClientBase PARENT;

Expand Down Expand Up @@ -855,7 +855,7 @@ class CDFUPartReader : public CDaFileSrvClientBase, implements IDFUFilePartReade
}
ensureAvailable(0, nullptr); // reads from replyMb
}
// ISerialStream impl.
// IBufferedSerialInputStream impl.
virtual const void *peek(size32_t wanted, size32_t &got) override
{
if (bufRemaining >= wanted)
Expand Down
10 changes: 5 additions & 5 deletions roxie/ccd/ccdactivities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1421,7 +1421,7 @@ class CsvRecordProcessor : public RecordProcessor
}
}
csvSplitter.init(helper->getMaxColumns(), csvInfo, quotes, separators, terminators, escapes);
ISerialStream *stream = reader->queryDirectStreamReader();
IBufferedSerialInputStream *stream = reader->queryDirectStreamReader();
while (!aborted)
{
size32_t thisLineLength = csvSplitter.splitLine(stream, maxRowSize);
Expand Down Expand Up @@ -2043,7 +2043,7 @@ class CParallelRoxieDiskAggregateActivity : public CParallelRoxieActivity
RtlDynamicRowBuilder finalBuilder(rowAllocator, false);
if (deserializer)
{
Owned<ISerialStream> stream = createMemoryBufferSerialStream(m);
Owned<IBufferedSerialInputStream> stream = createMemoryBufferSerialStream(m);
CThorStreamDeserializerSource rowSource(stream);

while (m.remaining())
Expand Down Expand Up @@ -2309,7 +2309,7 @@ class CParallelRoxieDiskGroupAggregateActivity : public CParallelRoxieActivity
{
CriticalBlock b(parCrit); // MORE - use a spinlock
MemoryBuffer &m = d.data;
Owned<ISerialStream> stream = createMemoryBufferSerialStream(m);
Owned<IBufferedSerialInputStream> stream = createMemoryBufferSerialStream(m);
CThorStreamDeserializerSource rowSource(stream);
while (m.remaining())
{
Expand Down Expand Up @@ -3705,7 +3705,7 @@ class CRoxieFetchActivityBase : public CRoxieAgentActivity
IHThorFetchBaseArg *helper;
const CRoxieFetchActivityFactory *factory;
Owned<IFileIO> rawFile;
Owned<ISerialStream> rawStream;
Owned<IBufferedSerialInputStream> rawStream;
offset_t base;
char *inputData;
char *inputLimit;
Expand Down Expand Up @@ -4478,7 +4478,7 @@ class CRoxieKeyedJoinFetchActivity : public CRoxieAgentActivity
const char *inputData;
Linked<ITranslatorSet> translators;
Linked<IFileIOArray> files;
Owned<ISerialStream> rawStream;
Owned<IBufferedSerialInputStream> rawStream;
CThorContiguousRowBuffer prefetchSource;
Owned<ISourceRowPrefetcher> prefetcher;
const IDynamicTransform *translator = nullptr;
Expand Down
2 changes: 1 addition & 1 deletion roxie/ccd/ccdcontext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ class RawDataReader : public WorkUnitRowReaderBase
const IRoxieContextLogger &logctx;
byte *bufferBase;
MemoryBuffer blockBuffer;
Owned<ISerialStream> bufferStream;
Owned<IBufferedSerialInputStream> bufferStream;
CThorStreamDeserializerSource rowSource;
bool eof;
bool eogPending;
Expand Down
6 changes: 3 additions & 3 deletions roxie/ccd/ccdkey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ typedef IArrayOf<InMemoryIndex> InMemoryIndexSet;
* (the file boundaries are the only place we are sure there are record boundaries - fixed size case could do better in theory)
* The disk case gives an empty set to all but the first thread - could certainly do better
* How it should work
* IDirectReader should be derived from ISerialStream (with the addition of the ptr to offset mapping functions)
* IDirectReader should be derived from IBufferedSerialInputStream (with the addition of the ptr to offset mapping functions)
* We should not peek past end of a single file
* We are then free to use memory-mapped files and/or existing fileIO stream code, and IDirectReader is much simpler,
* just handling the move from one file to the next and the PtrToOffset stuff
Expand Down Expand Up @@ -562,7 +562,7 @@ class InMemoryDirectReader : public CDirectReaderBase
pos = (memsize_t) _readPos;
}

// Interface ISerialStream
// Interface IBufferedSerialInputStream

virtual const void * peek(size32_t wanted,size32_t &got) override
{
Expand Down Expand Up @@ -672,7 +672,7 @@ class BufferedDirectReader : public CDirectReaderBase
offset_t thisFileStartPos;
offset_t completedStreamsSize;
Owned<IFileIO> thisPart;
Owned<ISerialStream> curStream;
Owned<IBufferedSerialInputStream> curStream;
unsigned thisPartIdx;

BufferedDirectReader(const RowFilter &_postFilter, bool _grouped, offset_t _startPos, IFileIOArray *_f, unsigned _partNo, unsigned _numParts, const ITranslatorSet *_translators)
Expand Down
2 changes: 1 addition & 1 deletion roxie/ccd/ccdkey.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ typedef IArrayOf<IKeySegmentMonitor> SegMonitorArray;
* IDirectReader (this remains TBD at this point)
*
*/
interface IDirectStreamReader : extends ISerialStream, extends ISimpleReadStream
interface IDirectStreamReader : extends IBufferedSerialInputStream, extends ISimpleReadStream
{
virtual unsigned queryFilePart() const = 0; // used by CSV
virtual unsigned __int64 makeFilePositionLocal(offset_t pos) = 0; // used by XML
Expand Down
2 changes: 1 addition & 1 deletion roxie/ccd/ccdserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4376,7 +4376,7 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie

// this is only used to avoid recreating a bufferStream for each row. A better solution may be needed
MemoryBuffer tempRowBuffer;
Owned<ISerialStream> bufferStream;
Owned<IBufferedSerialInputStream> bufferStream;
CThorStreamDeserializerSource rowSource;

protected:
Expand Down
6 changes: 3 additions & 3 deletions rtl/eclrtl/rtlcommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
#include "junicode.hpp"
#include "rtlcommon.hpp"

CContiguousRowBuffer::CContiguousRowBuffer(ISerialStream * _in) : in(_in)
CContiguousRowBuffer::CContiguousRowBuffer(IBufferedSerialInputStream * _in) : in(_in)
{
clearBuffer();
}

void CContiguousRowBuffer::setStream(ISerialStream *_in)
void CContiguousRowBuffer::setStream(IBufferedSerialInputStream *_in)
{
in = _in;
clearBuffer();
Expand Down Expand Up @@ -48,7 +48,7 @@ void CContiguousRowBuffer::peekBytesDirect(unsigned maxSize)

//---------------------------------------------------------------------------------------------------------------------

CThorContiguousRowBuffer::CThorContiguousRowBuffer(ISerialStream * _in) : CContiguousRowBuffer(_in)
CThorContiguousRowBuffer::CThorContiguousRowBuffer(IBufferedSerialInputStream * _in) : CContiguousRowBuffer(_in)
{
readOffset = 0;
}
Expand Down
10 changes: 5 additions & 5 deletions rtl/eclrtl/rtlcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ class ECLRTL_API CContiguousRowBuffer
{
public:
CContiguousRowBuffer() = default;
CContiguousRowBuffer(ISerialStream * _in);
CContiguousRowBuffer(IBufferedSerialInputStream * _in);

void setStream(ISerialStream *_in);
void setStream(IBufferedSerialInputStream *_in);
const byte * peekBytes(size32_t maxSize);
const byte * peekFirstByte();
void skipBytes(size32_t size)
Expand Down Expand Up @@ -59,7 +59,7 @@ class ECLRTL_API CContiguousRowBuffer
protected:
const byte * cur = nullptr;
private:
ISerialStream* in = nullptr;
IBufferedSerialInputStream* in = nullptr;
const byte * buffer = nullptr;
size32_t available = 0;
};
Expand All @@ -71,9 +71,9 @@ class ECLRTL_API CThorContiguousRowBuffer : public CContiguousRowBuffer, impleme
{
public:
CThorContiguousRowBuffer() = default;
CThorContiguousRowBuffer(ISerialStream * _in);
CThorContiguousRowBuffer(IBufferedSerialInputStream * _in);

inline void setStream(ISerialStream *_in)
inline void setStream(IBufferedSerialInputStream *_in)
{
CContiguousRowBuffer::setStream(_in);
readOffset = 0;
Expand Down
Loading
Loading