-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-19767: [ABFS] Introduce Abfs Input Policy for detecting read patterns #8153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
This comment was marked as outdated.
This comment was marked as outdated.
|
@anujmodi2021 I am trying to propose a single optimised implementation of an input stream across cloud implementations, as I think we all need this kind of logic. Ideally I want to get to a place where 80% of the logic is shared in a common layer, and then we only implement cloud specific clients to actually make the requests separately. There is some consensus to move the shared logic into the parquet-java repo: https://lists.apache.org/thread/nbksq32cs8h1ldj8762y6wh9zzp8gqx6 , and some buy-in from the team at google. I'll be following up on this in the new year. Would be great to get your thoughts and if your team would also like to collaborate on this. |
This comment was marked as outdated.
This comment was marked as outdated.
| */ | ||
| public class AbfsAdaptiveInputStream extends AbfsInputStream { | ||
|
|
||
| public AbfsAdaptiveInputStream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: needs javadoc comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| * Enum for ABFS Input Policies. | ||
| * Each policy maps to a particular implementation of {@link AbfsInputStream} | ||
| */ | ||
| public enum AbfsInputPolicy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be ReadPolicy or input stream policy instead of input policy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to ReadPolicy.
Anything is fine IMO
| RANDOM(FS_OPTION_OPENFILE_READ_POLICY_RANDOM), | ||
| ADAPTIVE(FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE); | ||
|
|
||
| private final String policy; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| abfsInputStreamContext, eTag, tracingContext); | ||
| } | ||
|
|
||
| @Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inherit doc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| import static org.apache.hadoop.fs.azurebfs.constants.ReadType.NORMAL_READ; | ||
| import static org.apache.hadoop.fs.azurebfs.constants.ReadType.PREFETCH_READ; | ||
| import static org.apache.hadoop.fs.azurebfs.constants.ReadType.SMALLFILE_READ; | ||
| import static org.apache.hadoop.fs.azurebfs.constants.ReadType.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Takne
| import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderVersion; | ||
| import org.apache.hadoop.fs.impl.OpenFileParameters; | ||
|
|
||
| import static org.apache.hadoop.fs.Options.OpenFileOptions.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"*" import should be reverted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
|
|
||
| /* | ||
| * Test to verify Random Read Type. | ||
| * Settin Read Policy to Parquet ensures Random Read Type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: setting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| ArgumentCaptor<TracingContext> captor9 = ArgumentCaptor.forClass(TracingContext.class); | ||
|
|
||
| List<String> paths = captor1.getAllValues(); | ||
| System.out.println(paths); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the sysouts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching. Taken
This comment was marked as outdated.
This comment was marked as outdated.
|
:::: AGGREGATED TEST RESULT :::: ============================================================
|
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
| /** | ||
| * Constructs AbfsAdaptiveInputStream | ||
| * @param client AbfsClient to be used for read operations | ||
| * @param statistics to recordinput stream statistics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| } | ||
|
|
||
| /** | ||
| * inheritDoc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit {@ inheritDoc}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| } | ||
|
|
||
| @Test | ||
| public void testRandomInputStreamDoesNotQueuePrefetches() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: javadoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| protected final int readAheadRange; | ||
| private final int readAheadRange; | ||
|
|
||
| private boolean firstRead = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment as other params can be added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY = true; | ||
|
|
||
| public static final boolean DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY = true; | ||
| public static final boolean DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we dont require to disable it I think. We only add the request header if the read type is set to prefetch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching. Will revert
| */ | ||
| @VisibleForTesting | ||
| long getLimit() { | ||
| int getLimit() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why changed from long to int ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
limit is already int. The caller would have needed unncessary handling.
| * @throws IOException if there is an error | ||
| */ | ||
| protected abstract int readOneBlock(final byte[] b, final int off, final int len) throws IOException; | ||
| protected abstract int readOneBlock(byte[] b, int off, int len) throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't these parameters be still kept final ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is abstract method's definition. Real implementatio still has final.
Checkstyle reported final as redundant modifier here.
|
|
||
| private final Stack<Integer> removedBufferList = new Stack<>(); | ||
| private final ConcurrentSkipListSet<Integer> removedBufferList = new ConcurrentSkipListSet<>(); | ||
| private ConcurrentSkipListSet<Integer> freeList = new ConcurrentSkipListSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comments for this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| getInProgressList().clear(); | ||
| getCompletedReadList().clear(); | ||
| getFreeList().clear(); | ||
| this.freeList.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clearFreeList() method can be called here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| if (!validate(b, off, len)) { | ||
| return -1; | ||
| } | ||
| //If buffer is empty, then fill the buffer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Space missing between // and If.
Same thing at multiple other places as well, please correct it at all other places as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing is present in AbfsRandomInputStream and AbfsPrefetchInputStream as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| import org.apache.hadoop.classification.InterfaceAudience; | ||
| import org.apache.hadoop.classification.InterfaceStability; | ||
| import org.apache.hadoop.fs.FileSystem; | ||
| import org.apache.hadoop.fs.Options; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of importing entire Options class, we can just import OpenFileOptions class and directly mention OpenFileOptions class below in comments.
import org.apache.hadoop.fs.Options.OpenFileOptions;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize"; | ||
| /** | ||
| * Provides hint for the read workload pattern. | ||
| * Possible Values Exposed in {@link Options.OpenFileOptions} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICIES?
OpenFileOptions has other values as well.
If so, we can do
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICIES;
and mention FS_OPTION_OPENFILE_READ_POLICIES only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried but that variable is not resolvable here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can do something like this:
import org.apache.hadoop.fs.Options.OpenFileOptions;
- Possible Values Exposed in {@link OpenFileOptions#FS_OPTION_OPENFILE_READ_POLICIES}
| } | ||
| //If buffer is empty, then fill the buffer. | ||
| if (getBCursor() == getLimit()) { | ||
| //If EOF, then return -1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| } | ||
|
|
||
| long bytesRead = 0; | ||
| //reset buffer to initial state - i.e., throw away existing data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| return buffer; | ||
| } | ||
|
|
||
| protected void setBuffer(byte[] buffer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java doc missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| * @return the corresponding AbsInputPolicy to be used | ||
| */ | ||
| public static AbfsReadPolicy getAbfsReadPolicy(String name) { | ||
| String trimmed = name.trim().toLowerCase(Locale.ENGLISH); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This variable can be renamed to something else.
Like readPolicyStr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
| parameters.map(OpenFileParameters::getOptions), | ||
| contextEncryptionAdapter), | ||
| eTag, tracingContext); | ||
| AbfsReadPolicy inputPolicy = AbfsReadPolicy.getAbfsReadPolicy(getAbfsConfiguration().getAbfsReadPolicy()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method contains lot of lines already. Instead of defining this switch case here, it would be better to define a new method so that tomorrow if new read pattern is introduced, we can just update that method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
Thanks for heads up @ahmarsuhail With this change we are not chaning how ABFS handles parquet file though. This just improves the infra and add capability for future improvements to be plugged in seemlessly. We will surely help address any gaps in ABFS to make things better for the common ground you are gearing up to improve. |
|
💔 -1 overall
This message was automatically generated. |
Description of PR
Since the onset of ABFS Driver, there has been a single implementation of AbfsInputStream. Different kinds of workloads require different heuristics to give the best performance for that type of workload. For example:
Sequential Read Workloads like DFSIO and DistCP gain performance improvement from prefetched
Random Read Workloads on other hand do not need Prefetches and enabling prefetches for them is an overhead and TPS heavy
Query Workloads involving Parquet/ORC files benefit from improvements like Footer Read and Small Files Reads
To accomodate this we need to determine the pattern and accordingly create Input Streams implemented for that particular pattern.
Moving ahead more relevant policies and specialized implementation of AbfsInputStream can be added.
This PR only refactors the way we create input streams. No logical change introduced. As today by default we will continue to use AbfsAdaptiveInputStream which can cater to all kind of workloads.
How was this patch tested?
New tests were added.