Skip to content

Commit

Permalink
apacheGH-40037: [C++][FS][Azure] Make attempted reads and writes agai…
Browse files Browse the repository at this point in the history
…nst directories fail fast (apache#40119)

### Rationale for this change
Prevent confusion if a user attempts to read or write a directory. 

### What changes are included in this PR?
- Make `ObjectAppendStream::Flush` a noop if `ObjectAppendStream::Init` has not run successfully. This avoids an unhandled error when the destructor calls flush. 
- Check blob properties for directory marker metadata when initialising `ObjectInputFile` or `ObjectAppendStream`. 
- When initialising `ObjectAppendStream` call `GetFileInfo` if it is a flat namespace account.

### Are these changes tested?
Add new tests `DisallowReadingOrWritingDirectoryMarkers` and `DisallowCreatingFileAndDirectoryWithTheSameName` to cover the new fail fast behaviour. 
Also updated `WriteMetadata` to ensure that my changes to Flush didn't break setting metadata without calling `Write` on the stream. 

### Are there any user-facing changes?
Yes. Invalid read and write operations will now fail fast and gracefully. Previously could get into a confusing state where there were files and directories at the same path and there were some un-graceful failures. 

* Closes: apache#40037

Authored-by: Thomas Newton <[email protected]>
Signed-off-by: Felipe Oliveira Carvalho <[email protected]>
  • Loading branch information
Tom-Newton authored Feb 21, 2024
1 parent c344446 commit 8a62f30
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 20 deletions.
95 changes: 75 additions & 20 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,22 @@ bool IsContainerNotFound(const Storage::StorageException& e) {
return false;
}

const auto kHierarchicalNamespaceIsDirectoryMetadataKey = "hdi_isFolder";
const auto kFlatNamespaceIsDirectoryMetadataKey = "is_directory";

bool MetadataIndicatesIsDirectory(const Storage::Metadata& metadata) {
// Inspired by
// https://github.com/Azure/azure-sdk-for-cpp/blob/12407e8bfcb9bc1aa43b253c1d0ec93bf795ae3b/sdk/storage/azure-storage-files-datalake/src/datalake_utilities.cpp#L86-L91
auto hierarchical_directory_metadata =
metadata.find(kHierarchicalNamespaceIsDirectoryMetadataKey);
if (hierarchical_directory_metadata != metadata.end()) {
return hierarchical_directory_metadata->second == "true";
}
auto flat_directory_metadata = metadata.find(kFlatNamespaceIsDirectoryMetadataKey);
return flat_directory_metadata != metadata.end() &&
flat_directory_metadata->second == "true";
}

template <typename ArrowType>
std::string FormatValue(typename TypeTraits<ArrowType>::CType value) {
struct StringAppender {
Expand Down Expand Up @@ -512,11 +528,18 @@ class ObjectInputFile final : public io::RandomAccessFile {

Status Init() {
if (content_length_ != kNoSize) {
// When the user provides the file size we don't validate that its a file. This is
// only a read so its not a big deal if the user makes a mistake.
DCHECK_GE(content_length_, 0);
return Status::OK();
}
try {
// To open an ObjectInputFile the Blob must exist and it must not represent
// a directory. Additionally we need to know the file size.
auto properties = blob_client_->GetProperties();
if (MetadataIndicatesIsDirectory(properties.Value.Metadata)) {
return NotAFile(location_);
}
content_length_ = properties.Value.BlobSize;
metadata_ = PropertiesToMetadata(properties.Value);
return Status::OK();
Expand Down Expand Up @@ -698,11 +721,10 @@ class ObjectAppendStream final : public io::OutputStream {
ObjectAppendStream(std::shared_ptr<Blobs::BlockBlobClient> block_blob_client,
const io::IOContext& io_context, const AzureLocation& location,
const std::shared_ptr<const KeyValueMetadata>& metadata,
const AzureOptions& options, int64_t size = kNoSize)
const AzureOptions& options)
: block_blob_client_(std::move(block_blob_client)),
io_context_(io_context),
location_(location),
content_length_(size) {
location_(location) {
if (metadata && metadata->size() != 0) {
metadata_ = ArrowMetadataToAzureMetadata(metadata);
} else if (options.default_metadata && options.default_metadata->size() != 0) {
Expand All @@ -716,17 +738,31 @@ class ObjectAppendStream final : public io::OutputStream {
io::internal::CloseFromDestructor(this);
}

Status Init() {
if (content_length_ != kNoSize) {
DCHECK_GE(content_length_, 0);
pos_ = content_length_;
Status Init(const bool truncate,
std::function<Status()> ensure_not_flat_namespace_directory) {
if (truncate) {
content_length_ = 0;
pos_ = 0;
// We need to create an empty file overwriting any existing file, but
// fail if there is an existing directory.
RETURN_NOT_OK(ensure_not_flat_namespace_directory());
// On hierarchical namespace CreateEmptyBlockBlob will fail if there is an existing
// directory so we don't need to check like we do on flat namespace.
RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client_));
} else {
try {
auto properties = block_blob_client_->GetProperties();
if (MetadataIndicatesIsDirectory(properties.Value.Metadata)) {
return NotAFile(location_);
}
content_length_ = properties.Value.BlobSize;
pos_ = content_length_;
} catch (const Storage::StorageException& exception) {
if (exception.StatusCode == Http::HttpStatusCode::NotFound) {
// No file exists but on flat namespace its possible there is a directory
// marker or an implied directory. Ensure there is no directory before starting
// a new empty file.
RETURN_NOT_OK(ensure_not_flat_namespace_directory());
RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client_));
} else {
return ExceptionToStatus(
Expand All @@ -743,6 +779,7 @@ class ObjectAppendStream final : public io::OutputStream {
block_ids_.push_back(block.Name);
}
}
initialised_ = true;
return Status::OK();
}

Expand Down Expand Up @@ -789,6 +826,11 @@ class ObjectAppendStream final : public io::OutputStream {

Status Flush() override {
RETURN_NOT_OK(CheckClosed("flush"));
if (!initialised_) {
// If the stream has not been successfully initialized then there is nothing to
// flush. This also avoids some unhandled errors when flushing in the destructor.
return Status::OK();
}
return CommitBlockList(block_blob_client_, block_ids_, metadata_);
}

Expand Down Expand Up @@ -840,10 +882,11 @@ class ObjectAppendStream final : public io::OutputStream {
std::shared_ptr<Blobs::BlockBlobClient> block_blob_client_;
const io::IOContext io_context_;
const AzureLocation location_;
int64_t content_length_ = kNoSize;

bool closed_ = false;
bool initialised_ = false;
int64_t pos_ = 0;
int64_t content_length_ = kNoSize;
std::vector<std::string> block_ids_;
Storage::Metadata metadata_;
};
Expand Down Expand Up @@ -1666,20 +1709,32 @@ class AzureFileSystem::Impl {
AzureFileSystem* fs) {
RETURN_NOT_OK(ValidateFileLocation(location));

const auto blob_container_client = GetBlobContainerClient(location.container);
auto block_blob_client = std::make_shared<Blobs::BlockBlobClient>(
blob_service_client_->GetBlobContainerClient(location.container)
.GetBlockBlobClient(location.path));
blob_container_client.GetBlockBlobClient(location.path));

auto ensure_not_flat_namespace_directory = [this, location,
blob_container_client]() -> Status {
ARROW_ASSIGN_OR_RAISE(
auto hns_support,
HierarchicalNamespaceSupport(GetFileSystemClient(location.container)));
if (hns_support == HNSSupport::kDisabled) {
// Flat namespace so we need to GetFileInfo in-case its a directory.
ARROW_ASSIGN_OR_RAISE(auto status, GetFileInfo(blob_container_client, location))
if (status.type() == FileType::Directory) {
return NotAFile(location);
}
}
// kContainerNotFound - it doesn't exist, so no need to check if its a directory.
// kEnabled - hierarchical namespace so Azure APIs will fail if its a directory. We
// don't need to explicitly check.
return Status::OK();
};

std::shared_ptr<ObjectAppendStream> stream;
if (truncate) {
RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client));
stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(),
location, metadata, options_, 0);
} else {
stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(),
location, metadata, options_);
}
RETURN_NOT_OK(stream->Init());
stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(),
location, metadata, options_);
RETURN_NOT_OK(stream->Init(truncate, ensure_not_flat_namespace_directory));
return stream;
}

Expand All @@ -1694,7 +1749,7 @@ class AzureFileSystem::Impl {
// on directory marker blobs.
// https://github.com/fsspec/adlfs/blob/32132c4094350fca2680155a5c236f2e9f991ba5/adlfs/spec.py#L855-L870
Blobs::UploadBlockBlobFromOptions blob_options;
blob_options.Metadata.emplace("is_directory", "true");
blob_options.Metadata.emplace(kFlatNamespaceIsDirectoryMetadataKey, "true");
block_blob_client.UploadFrom(nullptr, 0, blob_options);
}

Expand Down
60 changes: 60 additions & 0 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,41 @@ class TestAzureFileSystem : public ::testing::Test {
AssertFileInfo(fs(), subdir3, FileType::Directory);
}

void TestDisallowReadingOrWritingDirectoryMarkers() {
auto data = SetUpPreexistingData();
auto directory_path = data.Path("directory");

ASSERT_OK(fs()->CreateDir(directory_path));
ASSERT_RAISES(IOError, fs()->OpenInputFile(directory_path));
ASSERT_RAISES(IOError, fs()->OpenOutputStream(directory_path));
ASSERT_RAISES(IOError, fs()->OpenAppendStream(directory_path));

auto directory_path_with_slash = directory_path + "/";
ASSERT_RAISES(IOError, fs()->OpenInputFile(directory_path_with_slash));
ASSERT_RAISES(IOError, fs()->OpenOutputStream(directory_path_with_slash));
ASSERT_RAISES(IOError, fs()->OpenAppendStream(directory_path_with_slash));
}

void TestDisallowCreatingFileAndDirectoryWithTheSameName() {
auto data = SetUpPreexistingData();
auto path1 = data.Path("directory1");
ASSERT_OK(fs()->CreateDir(path1));
ASSERT_RAISES(IOError, fs()->OpenOutputStream(path1));
ASSERT_RAISES(IOError, fs()->OpenAppendStream(path1));
AssertFileInfo(fs(), path1, FileType::Directory);

auto path2 = data.Path("directory2");
ASSERT_OK(fs()->OpenOutputStream(path2));
// CreateDir returns OK even if there is already a file or directory at this
// location. Whether or not this is the desired behaviour is debatable.
ASSERT_OK(fs()->CreateDir(path2));
AssertFileInfo(fs(), path2, FileType::File);
}

void TestOpenOutputStreamWithMissingContainer() {
ASSERT_RAISES(IOError, fs()->OpenOutputStream("not-a-container/file", {}));
}

void TestDeleteDirSuccessEmpty() {
if (HasSubmitBatchBug()) {
GTEST_SKIP() << kSubmitBatchBugMessage;
Expand Down Expand Up @@ -1665,6 +1700,19 @@ TYPED_TEST(TestAzureFileSystemOnAllScenarios, CreateDirOnMissingContainer) {
this->TestCreateDirOnMissingContainer();
}

TYPED_TEST(TestAzureFileSystemOnAllScenarios, DisallowReadingOrWritingDirectoryMarkers) {
this->TestDisallowReadingOrWritingDirectoryMarkers();
}

TYPED_TEST(TestAzureFileSystemOnAllScenarios,
DisallowCreatingFileAndDirectoryWithTheSameName) {
this->TestDisallowCreatingFileAndDirectoryWithTheSameName();
}

TYPED_TEST(TestAzureFileSystemOnAllScenarios, OpenOutputStreamWithMissingContainer) {
this->TestOpenOutputStreamWithMissingContainer();
}

TYPED_TEST(TestAzureFileSystemOnAllScenarios, DeleteDirSuccessEmpty) {
this->TestDeleteDirSuccessEmpty();
}
Expand Down Expand Up @@ -2232,6 +2280,18 @@ TEST_F(TestAzuriteFileSystem, WriteMetadata) {
.Value.Metadata;
// Defaults are overwritten and not merged.
EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "foo")}, blob_metadata);

// Metadata can be written without writing any data.
ASSERT_OK_AND_ASSIGN(
output, fs_with_defaults->OpenAppendStream(
full_path, /*metadata=*/arrow::key_value_metadata({{"bar", "baz"}})));
ASSERT_OK(output->Close());
blob_metadata = blob_service_client_->GetBlobContainerClient(data.container_name)
.GetBlockBlobClient(blob_path)
.GetProperties()
.Value.Metadata;
// Defaults are overwritten and not merged.
EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "baz")}, blob_metadata);
}

TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) {
Expand Down

0 comments on commit 8a62f30

Please sign in to comment.