Skip to content

Commit

Permalink
Rename AsyncFileHashingStrategy -> FileHashStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
jgainerdewar committed Jan 24, 2025
1 parent 6854497 commit 61eae83
Show file tree
Hide file tree
Showing 16 changed files with 65 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class DefaultStandardFileHashingActor(standardParams: StandardFileHashingActorPa
extends StandardFileHashingActor(standardParams) {
override val ioCommandBuilder: IoCommandBuilder = DefaultIoCommandBuilder

override val defaultHashingStrategies: Map[String, AsyncFileHashingStrategy] = Map(
("drs", AsyncFileHashingStrategy.Crc32c)
override val defaultHashingStrategies: Map[String, FileHashStrategy] = Map(
("drs", FileHashStrategy.Crc32c)
)
}

Expand Down Expand Up @@ -88,19 +88,19 @@ abstract class StandardFileHashingActor(standardParams: StandardFileHashingActor
override lazy val configurationDescriptor: BackendConfigurationDescriptor = standardParams.configurationDescriptor

// Child classes can override to set per-filesystem defaults
val defaultHashingStrategies: Map[String, AsyncFileHashingStrategy] = Map.empty
val defaultHashingStrategies: Map[String, FileHashStrategy] = Map.empty

// Hashing strategy to use if none is configured.
val fallbackHashingStrategy: AsyncFileHashingStrategy = AsyncFileHashingStrategy.Md5
val fallbackHashingStrategy: FileHashStrategy = FileHashStrategy.Md5

// Combines defaultHashingStrategies with user-provided configuration
lazy val hashingStrategies: Map[String, AsyncFileHashingStrategy] = {
lazy val hashingStrategies: Map[String, FileHashStrategy] = {

val configuredHashingStrategies = for {
fsConfigs <- configurationDescriptor.backendConfig.as[Option[Config]]("filesystems").toList
fsKey <- fsConfigs.entrySet.asScala.map(_.getKey)
fileHashStrategyName <- fsConfigs.as[Option[String]](s"fileSystems.${fsKey}.caching.hash-strategy")
fileHashStrategy <- AsyncFileHashingStrategy(fileHashStrategyName)
fileHashStrategy <- FileHashStrategy(fileHashStrategyName)
_ = log.info(s"Call caching hash strategy for ${fsKey} files will be ${fileHashStrategy}")
} yield (fsKey, fileHashStrategy)

Expand All @@ -118,7 +118,7 @@ abstract class StandardFileHashingActor(standardParams: StandardFileHashingActor
// Used by ConfigBackend for synchronous hashing of local files
def customHashStrategy(fileRequest: SingleFileHashRequest): Option[Try[String]] = None

def hashStrategyForPath(p: Path): AsyncFileHashingStrategy =
def hashStrategyForPath(p: Path): FileHashStrategy =
hashingStrategies.getOrElse(p.filesystemTypeKey, fallbackHashingStrategy)

def fileHashingReceive: Receive = {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package cromwell.core.callcaching

// File hashing strategies used by IoHashCommand, primarily when obtaining file hashes
// for call caching purposes.
sealed trait FileHashStrategy

object FileHashStrategy {
case object Crc32c extends FileHashStrategy
case object Md5 extends FileHashStrategy
case object Md5ThenIdentity extends FileHashStrategy
case object ETag extends FileHashStrategy

// TODO validate fs type here?
def apply(s: String): Option[FileHashStrategy] = s.toLowerCase() match {
case "md5" => Some(Md5)
case "crc32c" => Some(Crc32c)
case "md5+identity" => Some(Md5ThenIdentity)
case "etag" => Some(ETag)
case _ => None
}
}
4 changes: 2 additions & 2 deletions core/src/main/scala/cromwell/core/io/AsyncIo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package cromwell.core.io

import akka.actor.ActorRef
import com.typesafe.config.{Config, ConfigFactory}
import cromwell.core.callcaching.AsyncFileHashingStrategy
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io.IoPromiseProxyActor.IoCommandWithPromise
import cromwell.core.path.BetterFileMethods.OpenOptions
import cromwell.core.path.Path
Expand Down Expand Up @@ -48,7 +48,7 @@ class AsyncIo(ioEndpoint: ActorRef, ioCommandBuilder: IoCommandBuilder) {
def sizeAsync(path: Path): Future[Long] =
asyncCommand(ioCommandBuilder.sizeCommand(path))

def hashAsync(path: Path, hashStrategy: AsyncFileHashingStrategy): Future[String] =
def hashAsync(path: Path, hashStrategy: FileHashStrategy): Future[String] =
asyncCommand(ioCommandBuilder.hashCommand(path, hashStrategy))

def deleteAsync(path: Path, swallowIoExceptions: Boolean = false): Future[Unit] =
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package cromwell.core.io

import better.files.File.OpenOptions
import cromwell.core.callcaching.AsyncFileHashingStrategy
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
import cromwell.core.path.Path

Expand Down Expand Up @@ -43,7 +43,7 @@ object DefaultIoCommand {
s"DefaultIoDeleteCommand file '$file' swallowIOExceptions '$swallowIOExceptions'"
}

case class DefaultIoHashCommand(override val file: Path, override val hashStrategy: AsyncFileHashingStrategy)
case class DefaultIoHashCommand(override val file: Path, override val hashStrategy: FileHashStrategy)
extends IoHashCommand(file, hashStrategy) {
override def commandDescription: String = s"DefaultIoHashCommand file '$file' hashStrategy '$hashStrategy'"
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/cromwell/core/io/IoCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import better.files.File.OpenOptions
import com.google.api.client.util.ExponentialBackOff
import common.util.Backoff
import common.util.StringUtil.EnhancedToStringable
import cromwell.core.callcaching.AsyncFileHashingStrategy
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
import cromwell.core.path.Path
import cromwell.core.retry.SimpleExponentialBackoff
Expand Down Expand Up @@ -161,7 +161,7 @@ abstract class IoDeleteCommand(val file: Path, val swallowIOExceptions: Boolean)
/**
* Get Hash value for file
*/
abstract class IoHashCommand(val file: Path, val hashStrategy: AsyncFileHashingStrategy)
abstract class IoHashCommand(val file: Path, val hashStrategy: FileHashStrategy)
extends SingleFileIoCommand[String] {
override def toString = s"get $hashStrategy hash of ${file.pathAsString}"
override lazy val name = "hash"
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package cromwell.core.io

import cromwell.core.callcaching.AsyncFileHashingStrategy
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io.DefaultIoCommand._
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
import cromwell.core.path.BetterFileMethods.OpenOptions
Expand All @@ -19,7 +19,7 @@ abstract class PartialIoCommandBuilder {
def sizeCommand: PartialFunction[Path, Try[IoSizeCommand]] = PartialFunction.empty
def deleteCommand: PartialFunction[(Path, Boolean), Try[IoDeleteCommand]] = PartialFunction.empty
def copyCommand: PartialFunction[(Path, Path), Try[IoCopyCommand]] = PartialFunction.empty
def hashCommand: PartialFunction[(Path, AsyncFileHashingStrategy), Try[IoHashCommand]] = PartialFunction.empty
def hashCommand: PartialFunction[(Path, FileHashStrategy), Try[IoHashCommand]] = PartialFunction.empty
def touchCommand: PartialFunction[Path, Try[IoTouchCommand]] = PartialFunction.empty
def existsCommand: PartialFunction[Path, Try[IoExistsCommand]] = PartialFunction.empty
def isDirectoryCommand: PartialFunction[Path, Try[IoIsDirectoryCommand]] = PartialFunction.empty
Expand Down Expand Up @@ -86,7 +86,7 @@ class IoCommandBuilder(partialBuilders: List[PartialIoCommandBuilder] = List.emp
def copyCommand(src: Path, dest: Path): Try[IoCopyCommand] =
buildOrDefault(_.copyCommand, (src, dest), DefaultIoCopyCommand(src, dest))

def hashCommand(file: Path, hashStrategy: AsyncFileHashingStrategy): Try[IoHashCommand] =
def hashCommand(file: Path, hashStrategy: FileHashStrategy): Try[IoHashCommand] =
buildOrDefault(_.hashCommand, (file, hashStrategy), DefaultIoHashCommand(file, hashStrategy))

def touchCommand(file: Path): Try[IoTouchCommand] =
Expand Down
4 changes: 2 additions & 2 deletions engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cromwell.engine.io.nio
import cats.effect.IO
import cloud.nio.spi.{FileHash, HashType}
import common.util.StringUtil.EnhancedString
import cromwell.core.callcaching.AsyncFileHashingStrategy
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.path.Path
import cromwell.filesystems.blob.BlobPath
import cromwell.filesystems.drs.DrsPath
Expand All @@ -17,7 +17,7 @@ import scala.util.Try
object NioHashing {

// TODO update logic to respect hashStrategy
def hash(file: Path, hashStrategy: AsyncFileHashingStrategy): IO[String] =
def hash(file: Path, hashStrategy: FileHashStrategy): IO[String] =
// If there is no hash accessible from the file storage system,
// we'll read the file and generate the hash ourselves if we can.
getStoredHash(file)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package cromwell.filesystems.gcs.batch

import cromwell.core.callcaching.AsyncFileHashingStrategy
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io._
import cromwell.core.path.Path
import cromwell.filesystems.gcs.GcsPath
Expand All @@ -20,7 +20,7 @@ private case object PartialGcsBatchCommandBuilder extends PartialIoCommandBuilde
case (gcsSrc: GcsPath, gcsDest: GcsPath) => GcsBatchCopyCommand.forPaths(gcsSrc, gcsDest)
}

override def hashCommand: PartialFunction[(Path, AsyncFileHashingStrategy), Try[GcsBatchHashCommand]] = {
override def hashCommand: PartialFunction[(Path, FileHashStrategy), Try[GcsBatchHashCommand]] = {
case (gcsPath: GcsPath, s) =>
GcsBatchHashCommand.forPath(gcsPath, s)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.google.api.services.storage.model.{Objects, RewriteResponse, StorageO
import com.google.cloud.storage.BlobId
import common.util.StringUtil._
import common.validation.ErrorOr.ErrorOr
import cromwell.core.callcaching.AsyncFileHashingStrategy
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io._
import cromwell.filesystems.gcs._
import mouse.all._
Expand Down Expand Up @@ -176,16 +176,16 @@ object GcsBatchSizeCommand {
}

case class GcsBatchHashCommand(override val file: GcsPath,
override val hashStrategy: AsyncFileHashingStrategy,
override val hashStrategy: FileHashStrategy,
val blob: BlobId,
setUserProject: Boolean = false
) extends IoHashCommand(file, hashStrategy)
with GcsBatchGetCommand[String] {
override def mapGoogleResponse(response: StorageObject): ErrorOr[String] =
hashStrategy match {
case AsyncFileHashingStrategy.Crc32c => getCrc32c(response)
case AsyncFileHashingStrategy.Md5 => getMd5(response)
case AsyncFileHashingStrategy.Md5ThenIdentity => getMd5(response).orElse(getIdentity(response))
case FileHashStrategy.Crc32c => getCrc32c(response)
case FileHashStrategy.Md5 => getMd5(response)
case FileHashStrategy.Md5ThenIdentity => getMd5(response).orElse(getIdentity(response))
case _ => s"Hash strategy $hashStrategy is not supported by GCS".invalidNel
}

Expand All @@ -210,7 +210,7 @@ case class GcsBatchHashCommand(override val file: GcsPath,
}

object GcsBatchHashCommand {
def forPath(file: GcsPath, hashStrategy: AsyncFileHashingStrategy): Try[GcsBatchHashCommand] =
def forPath(file: GcsPath, hashStrategy: FileHashStrategy): Try[GcsBatchHashCommand] =
file.objectBlobId.map(GcsBatchHashCommand(file, hashStrategy, _))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*/
package cromwell.filesystems.s3.batch

import cromwell.core.callcaching.AsyncFileHashingStrategy
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io.{IoCommandBuilder, PartialIoCommandBuilder}
import cromwell.core.path.Path
import cromwell.filesystems.s3.S3Path
Expand All @@ -53,7 +53,7 @@ private case object PartialS3BatchCommandBuilder extends PartialIoCommandBuilder
case (src: S3Path, dest: S3Path) => Try(S3BatchCopyCommand(src, dest))
}

override def hashCommand: PartialFunction[(Path, AsyncFileHashingStrategy), Try[S3BatchHashCommand]] = {
override def hashCommand: PartialFunction[(Path, FileHashStrategy), Try[S3BatchHashCommand]] = {
case (path: S3Path, s) =>
Try(S3BatchHashCommand(path, s))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*/
package cromwell.filesystems.s3.batch

import cromwell.core.callcaching.AsyncFileHashingStrategy
import cromwell.core.callcaching.FileHashStrategy
import software.amazon.awssdk.core.exception.SdkException
import software.amazon.awssdk.services.s3.model.{CopyObjectResponse, HeadObjectResponse, NoSuchKeyException}
import cromwell.core.io.{
Expand Down Expand Up @@ -111,12 +111,12 @@ case class S3BatchSizeCommand(override val file: S3Path) extends IoSizeCommand(f
* `IoCommand` to find the hash of an s3 object (the `Etag`)
* @param file the path to the object
*/
case class S3BatchHashCommand(override val file: S3Path, override val hashStrategy: AsyncFileHashingStrategy)
case class S3BatchHashCommand(override val file: S3Path, override val hashStrategy: FileHashStrategy)
extends IoHashCommand(file, hashStrategy)
with S3BatchHeadCommand[String] {
// TODO handle other hash strategies
override def mapResponse(response: HeadObjectResponse): String = hashStrategy match {
case AsyncFileHashingStrategy.ETag => response.eTag
case FileHashStrategy.ETag => response.eTag
}
override def commandDescription: String = s"S3BatchEtagCommand file '$file' with hashStrategy '$hashStrategy'"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.google.common.io.BaseEncoding
import com.google.common.primitives.Longs
import common.util.StringUtil.EnhancedToStringable
import common.util.TimeUtil.EnhancedOffsetDateTime
import cromwell.core.callcaching.AsyncFileHashingStrategy
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io.{AsyncIo, DefaultIoCommandBuilder}
import cromwell.core.path.{Path, PathFactory}
import cromwell.core.instrumentation.InstrumentationPrefixes.ServicesPrefix
Expand Down Expand Up @@ -315,7 +315,7 @@ class ArchiveMetadataSchedulerActor(archiveMetadataConfig: ArchiveMetadataConfig
ServicesPrefix
)
expectedChecksum = crc32cStream.checksumString
uploadedChecksum <- asyncIo.hashAsync(path, AsyncFileHashingStrategy.Crc32c)
uploadedChecksum <- asyncIo.hashAsync(path, FileHashStrategy.Crc32c)
checksumValidatedTime = OffsetDateTime.now()
_ = sendTiming(archiverStreamTimingMetricsBasePath :+ "checksum_validation",
calculateTimeDifference(streamingCompleteTime, checksumValidatedTime),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ import cromwell.filesystems.s3.batch.S3BatchCommandBuilder
import cromwell.backend.BackendInitializationData
import cromwell.backend.impl.aws.AwsBatchBackendInitializationData
import cromwell.backend.impl.aws.AWSBatchStorageSystems
import cromwell.core.callcaching.AsyncFileHashingStrategy
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io.DefaultIoCommandBuilder

class AwsBatchBackendFileHashingActor(standardParams: StandardFileHashingActorParams)
extends StandardFileHashingActor(standardParams) {

override val defaultHashingStrategies: Map[String, AsyncFileHashingStrategy] = Map(
("s3", AsyncFileHashingStrategy.ETag)
override val defaultHashingStrategies: Map[String, FileHashStrategy] = Map(
("s3", FileHashStrategy.ETag)
)

override val ioCommandBuilder = BackendInitializationData
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package cromwell.backend.google.batch.callcaching

import cromwell.backend.standard.callcaching.{StandardFileHashingActor, StandardFileHashingActorParams}
import cromwell.core.callcaching.AsyncFileHashingStrategy
import cromwell.core.callcaching.FileHashStrategy
import cromwell.filesystems.gcs.batch.GcsBatchCommandBuilder

class BatchBackendFileHashingActor(standardParams: StandardFileHashingActorParams)
extends StandardFileHashingActor(standardParams) {
override val ioCommandBuilder = GcsBatchCommandBuilder

override val defaultHashingStrategies: Map[String, AsyncFileHashingStrategy] = Map(
("gcs", AsyncFileHashingStrategy.Crc32c),
("drs", AsyncFileHashingStrategy.Crc32c)
override val defaultHashingStrategies: Map[String, FileHashStrategy] = Map(
("gcs", FileHashStrategy.Crc32c),
("drs", FileHashStrategy.Crc32c)
)
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package cromwell.backend.google.pipelines.common.callcaching

import cromwell.backend.standard.callcaching.{StandardFileHashingActor, StandardFileHashingActorParams}
import cromwell.core.callcaching.AsyncFileHashingStrategy
import cromwell.core.callcaching.FileHashStrategy
import cromwell.filesystems.gcs.batch.GcsBatchCommandBuilder

class PipelinesApiBackendFileHashingActor(standardParams: StandardFileHashingActorParams)
extends StandardFileHashingActor(standardParams) {
override val ioCommandBuilder = GcsBatchCommandBuilder

override val defaultHashingStrategies: Map[String, AsyncFileHashingStrategy] = Map(
("gcs", AsyncFileHashingStrategy.Crc32c),
("drs", AsyncFileHashingStrategy.Crc32c)
override val defaultHashingStrategies: Map[String, FileHashStrategy] = Map(
("gcs", FileHashStrategy.Crc32c),
("drs", FileHashStrategy.Crc32c)
)
}

0 comments on commit 61eae83

Please sign in to comment.