Skip to content

Commit

Permalink
Feature/113 info permissions config (#114)
Browse files Browse the repository at this point in the history
* #113 atum info file permissions for hdfs loaded from `atum.hdfs.info.file.permissions` config value
  - tests use MiniDfsCluster to assert controlled correct behavior
  - test update (custom MiniDfsCluster with umask 000 allows max permissions)
  - HdfsFileUtils.DefaultFilePermissions is now publicly exposed; the user is expected to call compose the default and configured value it on his own by e.g.:
`HdfsFileUtils.getInfoFilePermissionsFromConfig().getOrElse(HdfsFileUtils.DefaultFilePermissions)`
  • Loading branch information
dk1844 authored Oct 11, 2021
1 parent 9de161b commit 5f3dd0b
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 7 deletions.
13 changes: 13 additions & 0 deletions atum/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
<version>3.5</version>
</dependency>

<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>${typesafe.config.version}</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-scala_${scala.binary.version}</artifactId>
Expand All @@ -70,6 +76,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import za.co.absa.atum.utils.HdfsFileUtils
case class ControlMeasuresHdfsStorerJsonFile(path: Path)(implicit val outputFs: FileSystem) extends HadoopFsControlMeasuresStorer {
override def store(controlInfo: ControlMeasure): Unit = {
val serialized = ControlMeasuresParser asJson controlInfo
HdfsFileUtils.saveStringDataToFile(path, serialized)
HdfsFileUtils.saveStringDataToFile(path, serialized,
HdfsFileUtils.getInfoFilePermissionsFromConfig().getOrElse(HdfsFileUtils.DefaultFilePermissions))
}

override def getInfo: String = {
Expand Down
39 changes: 34 additions & 5 deletions atum/src/main/scala/za/co/absa/atum/utils/HdfsFileUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,40 @@ package za.co.absa.atum.utils

import java.io.IOException

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkContext

import scala.collection.JavaConverters._

object HdfsFileUtils {
final val FilePermissionsKey = "atum.hdfs.info.file.permissions"

private val hadoopConfiguration = SparkContext.getOrCreate().hadoopConfiguration
final val DefaultFilePermissions = FsPermission.getFileDefault.applyUMask(
FsPermission.getUMask(FileSystem.get(hadoopConfiguration).getConf)
)

/**
* Reads Fs permissions from typesafe config from key [[za.co.absa.atum.utils.HdfsFileUtils#FilePermissionsKey()]]
* Consider using za.co.absa.atum.utils.HdfsFileUtils#DefaultFilePermissions() when this method yields None, e.g.:
* {{{
* HdfsFileUtils.getInfoFilePermissionsFromConfig()
* .getOrElse(HdfsFileUtils.DefaultFilePermissions)
* }}}
*
* @param config
* @return defined some FsPermissions if key/value was found, None otherwise
*/
def getInfoFilePermissionsFromConfig(config: Config = ConfigFactory.load()): Option[FsPermission] = {
if (config.hasPath(FilePermissionsKey)) {
Some(new FsPermission(config.getString(FilePermissionsKey)))
} else {
None
}
}

def readHdfsFileToString(path: Path)(implicit inputFs: FileSystem): String = {
val stream = inputFs.open(path)
Expand All @@ -36,16 +63,18 @@ object HdfsFileUtils {
/**
* Writes string data to a HDFS Path
*
* @param path Path to write to
* @param data data to write
* @param outputFs hadoop FS to use
* @param path Path to write to
* @param data data to write
* @param outputFs hadoop FS to use
* @param filePermissions desired permissions to use for the file written
* @throws IOException when data write errors occur
*/
def saveStringDataToFile(path: Path, data: String)(implicit outputFs: FileSystem): Unit = {
def saveStringDataToFile(path: Path, data: String, filePermissions: FsPermission = DefaultFilePermissions)
(implicit outputFs: FileSystem): Unit = {
import ARMImplicits._
for (fos <- outputFs.create(
path,
new FsPermission("777"),
filePermissions,
true,
4096,
outputFs.getDefaultReplication(path),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ object ControlMeasureUtils {
case JsonType.Pretty => cm.asJsonPretty
}

HdfsFileUtils.saveStringDataToFile(infoPath, jsonString)
HdfsFileUtils.saveStringDataToFile(infoPath, jsonString,
HdfsFileUtils.getInfoFilePermissionsFromConfig().getOrElse(HdfsFileUtils.DefaultFilePermissions))

log.info("Info file written: " + infoPath.toUri.toString)
log.info("JSON written: " + jsonString)
Expand Down
97 changes: 97 additions & 0 deletions atum/src/test/scala/za/co/absa/atum/utils/HdfsFileUtilsSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.utils

import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.FsPermission
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class HdfsFileUtilsSpec extends AnyFlatSpec with Matchers with SparkTestBase with MiniDfsClusterBase {

override def getConfiguration: Configuration = {
val cfg = new Configuration()
cfg.set("fs.permissions.umask-mode", "000")
cfg
}

private val Content = "Testing Content"

"HdfsFileUtils" should "write a file to HDFS (default permissions)" in {
val path = new Path("/tmp/hdfs-file-utils-test/def-perms.file")

HdfsFileUtils.getInfoFilePermissionsFromConfig() shouldBe None // key not present, testing default =>
HdfsFileUtils.saveStringDataToFile(path, Content)

fs.exists(path) shouldBe true
fs.getFileStatus(path).getPermission shouldBe HdfsFileUtils.DefaultFilePermissions
fs.deleteOnExit(path)
}

it should "write a file to HDFS (max permissions)" in {
val path = new Path("/tmp/hdfs-file-utils-test/max-perms.file")

val customConfig = ConfigFactory.empty()
.withValue("atum.hdfs.info.file.permissions", ConfigValueFactory.fromAnyRef("777"))
HdfsFileUtils.saveStringDataToFile(path, Content, HdfsFileUtils.getInfoFilePermissionsFromConfig(customConfig).get)

fs.exists(path) shouldBe true
// For this to work, we have miniDfsCluster with umask=000. Default 022 umask would allow max fsPermissions 755
fs.getFileStatus(path).getPermission shouldBe new FsPermission("777")
fs.deleteOnExit(path)
}

it should "write a file to HDFS (min permissions)" in {
val path = new Path("/tmp/hdfs-file-utils-test/min-perms.file")
val customConfig = ConfigFactory.empty()
.withValue("atum.hdfs.info.file.permissions", ConfigValueFactory.fromAnyRef("000"))
HdfsFileUtils.saveStringDataToFile(path, Content, HdfsFileUtils.getInfoFilePermissionsFromConfig(customConfig).get)

fs.exists(path) shouldBe true
fs.getFileStatus(path).getPermission shouldBe new FsPermission("000")
fs.deleteOnExit(path)
}

it should "write a file to HDFS (custom permissions)" in {
val path = new Path("/tmp/hdfs-file-utils-test/custom-perms.file")
val customConfig = ConfigFactory.empty()
.withValue("atum.hdfs.info.file.permissions", ConfigValueFactory.fromAnyRef("751"))
HdfsFileUtils.saveStringDataToFile(path, Content, HdfsFileUtils.getInfoFilePermissionsFromConfig(customConfig).get)

fs.exists(path) shouldBe true
fs.getFileStatus(path).getPermission shouldBe new FsPermission("751")
fs.deleteOnExit(path)
}

Seq(
"garbage$55%$",
"",
"1"
).foreach { invalidFsPermissionString =>
it should s"fail on invalid permissions config (case $invalidFsPermissionString)" in {
val customConfig = ConfigFactory.empty()
.withValue("atum.hdfs.info.file.permissions", ConfigValueFactory.fromAnyRef(invalidFsPermissionString))

intercept[IllegalArgumentException] {
HdfsFileUtils.getInfoFilePermissionsFromConfig(customConfig)
}
}
}


}
32 changes: 32 additions & 0 deletions atum/src/test/scala/za/co/absa/atum/utils/MiniDfsClusterBase.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.utils

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hdfs.MiniDFSCluster
import org.scalatest.{BeforeAndAfterAll, Suite}

trait MiniDfsClusterBase extends BeforeAndAfterAll { this: Suite =>

protected def getConfiguration: Configuration = new Configuration()

private val miniDFSCluster = new MiniDFSCluster(getConfiguration, 1, true, null);
implicit val fs = miniDFSCluster.getFileSystem()

override def afterAll(): Unit = {
miniDFSCluster.shutdown()
}
}
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@
<aws.java.sdk.version>2.13.65</aws.java.sdk.version>
<mockito.scala.version>1.15.0</mockito.scala.version>
<commons.version>0.0.27</commons.version>
<typesafe.config.version>1.4.1</typesafe.config.version>
<hadoop.version>2.8.5</hadoop.version>

<!-- Spark versions -->
<spark-24.version>2.4.6</spark-24.version>
Expand Down

0 comments on commit 5f3dd0b

Please sign in to comment.