From 8a62f30d34a606c8edca6cfaad56846e0e7aceea Mon Sep 17 00:00:00 2001
From: Thomas Newton <thomas.w.newton@gmail.com>
Date: Wed, 21 Feb 2024 16:29:18 +0000
Subject: [PATCH] GH-40037: [C++][FS][Azure] Make attempted reads and writes
 against directories fail fast (#40119)

### Rationale for this change
Prevent confusion if a user attempts to read or write a directory.

### What changes are included in this PR?
- Make `ObjectAppendStream::Flush` a noop if `ObjectAppendStream::Init` has not run successfully. This avoids an unhandled error when the destructor calls flush.
- Check blob properties for directory marker metadata when initialising `ObjectInputFile` or `ObjectAppendStream`.
- When initialising `ObjectAppendStream` call `GetFileInfo` if it is a flat namespace account.

### Are these changes tested?
Add new tests `DisallowReadingOrWritingDirectoryMarkers` and `DisallowCreatingFileAndDirectoryWithTheSameName` to cover the new fail fast behaviour.
Also updated `WriteMetadata` to ensure that my changes to Flush didn't break setting metadata without calling `Write` on the stream.

### Are there any user-facing changes?
Yes. Invalid read and write operations will now fail fast and gracefully. Previously could get into a confusing state where there were files and directories at the same path and there were some un-graceful failures.

* Closes: #40037

Authored-by: Thomas Newton <thomas.w.newton@gmail.com>
Signed-off-by: Felipe Oliveira Carvalho <felipekde@gmail.com>
---
 cpp/src/arrow/filesystem/azurefs.cc      | 95 +++++++++++++++++++-----
 cpp/src/arrow/filesystem/azurefs_test.cc | 60 +++++++++++++++
 2 files changed, 135 insertions(+), 20 deletions(-)

diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc
index de7cdba245ada..8ae33b8818827 100644
--- a/cpp/src/arrow/filesystem/azurefs.cc
+++ b/cpp/src/arrow/filesystem/azurefs.cc
@@ -347,6 +347,22 @@ bool IsContainerNotFound(const Storage::StorageException& e) {
   return false;
 }
 
+const auto kHierarchicalNamespaceIsDirectoryMetadataKey = "hdi_isFolder";
+const auto kFlatNamespaceIsDirectoryMetadataKey = "is_directory";
+
+bool MetadataIndicatesIsDirectory(const Storage::Metadata& metadata) {
+  // Inspired by
+  // https://github.com/Azure/azure-sdk-for-cpp/blob/12407e8bfcb9bc1aa43b253c1d0ec93bf795ae3b/sdk/storage/azure-storage-files-datalake/src/datalake_utilities.cpp#L86-L91
+  auto hierarchical_directory_metadata =
+      metadata.find(kHierarchicalNamespaceIsDirectoryMetadataKey);
+  if (hierarchical_directory_metadata != metadata.end()) {
+    return hierarchical_directory_metadata->second == "true";
+  }
+  auto flat_directory_metadata = metadata.find(kFlatNamespaceIsDirectoryMetadataKey);
+  return flat_directory_metadata != metadata.end() &&
+         flat_directory_metadata->second == "true";
+}
+
 template <typename ArrowType>
 std::string FormatValue(typename TypeTraits<ArrowType>::CType value) {
   struct StringAppender {
@@ -512,11 +528,18 @@ class ObjectInputFile final : public io::RandomAccessFile {
 
   Status Init() {
     if (content_length_ != kNoSize) {
+      // When the user provides the file size we don't validate that its a file. This is
+      // only a read so its not a big deal if the user makes a mistake.
       DCHECK_GE(content_length_, 0);
       return Status::OK();
     }
     try {
+      // To open an ObjectInputFile the Blob must exist and it must not represent
+      // a directory. Additionally we need to know the file size.
       auto properties = blob_client_->GetProperties();
+      if (MetadataIndicatesIsDirectory(properties.Value.Metadata)) {
+        return NotAFile(location_);
+      }
       content_length_ = properties.Value.BlobSize;
       metadata_ = PropertiesToMetadata(properties.Value);
       return Status::OK();
@@ -698,11 +721,10 @@ class ObjectAppendStream final : public io::OutputStream {
   ObjectAppendStream(std::shared_ptr<Blobs::BlockBlobClient> block_blob_client,
                      const io::IOContext& io_context, const AzureLocation& location,
                      const std::shared_ptr<const KeyValueMetadata>& metadata,
-                     const AzureOptions& options, int64_t size = kNoSize)
+                     const AzureOptions& options)
       : block_blob_client_(std::move(block_blob_client)),
         io_context_(io_context),
-        location_(location),
-        content_length_(size) {
+        location_(location) {
     if (metadata && metadata->size() != 0) {
       metadata_ = ArrowMetadataToAzureMetadata(metadata);
     } else if (options.default_metadata && options.default_metadata->size() != 0) {
@@ -716,17 +738,31 @@ class ObjectAppendStream final : public io::OutputStream {
     io::internal::CloseFromDestructor(this);
   }
 
-  Status Init() {
-    if (content_length_ != kNoSize) {
-      DCHECK_GE(content_length_, 0);
-      pos_ = content_length_;
+  Status Init(const bool truncate,
+              std::function<Status()> ensure_not_flat_namespace_directory) {
+    if (truncate) {
+      content_length_ = 0;
+      pos_ = 0;
+      // We need to create an empty file overwriting any existing file, but
+      // fail if there is an existing directory.
+      RETURN_NOT_OK(ensure_not_flat_namespace_directory());
+      // On hierarchical namespace CreateEmptyBlockBlob will fail if there is an existing
+      // directory so we don't need to check like we do on flat namespace.
+      RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client_));
     } else {
       try {
         auto properties = block_blob_client_->GetProperties();
+        if (MetadataIndicatesIsDirectory(properties.Value.Metadata)) {
+          return NotAFile(location_);
+        }
         content_length_ = properties.Value.BlobSize;
         pos_ = content_length_;
       } catch (const Storage::StorageException& exception) {
         if (exception.StatusCode == Http::HttpStatusCode::NotFound) {
+          // No file exists but on flat namespace its possible there is a directory
+          // marker or an implied directory. Ensure there is no directory before starting
+          // a new empty file.
+          RETURN_NOT_OK(ensure_not_flat_namespace_directory());
           RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client_));
         } else {
           return ExceptionToStatus(
@@ -743,6 +779,7 @@ class ObjectAppendStream final : public io::OutputStream {
         block_ids_.push_back(block.Name);
       }
     }
+    initialised_ = true;
     return Status::OK();
   }
 
@@ -789,6 +826,11 @@ class ObjectAppendStream final : public io::OutputStream {
 
   Status Flush() override {
     RETURN_NOT_OK(CheckClosed("flush"));
+    if (!initialised_) {
+      // If the stream has not been successfully initialized then there is nothing to
+      // flush. This also avoids some unhandled errors when flushing in the destructor.
+      return Status::OK();
+    }
     return CommitBlockList(block_blob_client_, block_ids_, metadata_);
   }
 
@@ -840,10 +882,11 @@ class ObjectAppendStream final : public io::OutputStream {
   std::shared_ptr<Blobs::BlockBlobClient> block_blob_client_;
   const io::IOContext io_context_;
   const AzureLocation location_;
+  int64_t content_length_ = kNoSize;
 
   bool closed_ = false;
+  bool initialised_ = false;
   int64_t pos_ = 0;
-  int64_t content_length_ = kNoSize;
   std::vector<std::string> block_ids_;
   Storage::Metadata metadata_;
 };
@@ -1666,20 +1709,32 @@ class AzureFileSystem::Impl {
       AzureFileSystem* fs) {
     RETURN_NOT_OK(ValidateFileLocation(location));
 
+    const auto blob_container_client = GetBlobContainerClient(location.container);
     auto block_blob_client = std::make_shared<Blobs::BlockBlobClient>(
-        blob_service_client_->GetBlobContainerClient(location.container)
-            .GetBlockBlobClient(location.path));
+        blob_container_client.GetBlockBlobClient(location.path));
+
+    auto ensure_not_flat_namespace_directory = [this, location,
+                                                blob_container_client]() -> Status {
+      ARROW_ASSIGN_OR_RAISE(
+          auto hns_support,
+          HierarchicalNamespaceSupport(GetFileSystemClient(location.container)));
+      if (hns_support == HNSSupport::kDisabled) {
+        // Flat namespace so we need to GetFileInfo in-case its a directory.
+        ARROW_ASSIGN_OR_RAISE(auto status, GetFileInfo(blob_container_client, location))
+        if (status.type() == FileType::Directory) {
+          return NotAFile(location);
+        }
+      }
+      // kContainerNotFound - it doesn't exist, so no need to check if its a directory.
+      // kEnabled - hierarchical namespace so Azure APIs will fail if its a directory. We
+      // don't need to explicitly check.
+      return Status::OK();
+    };
 
     std::shared_ptr<ObjectAppendStream> stream;
-    if (truncate) {
-      RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client));
-      stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(),
-                                                    location, metadata, options_, 0);
-    } else {
-      stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(),
-                                                    location, metadata, options_);
-    }
-    RETURN_NOT_OK(stream->Init());
+    stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(),
+                                                  location, metadata, options_);
+    RETURN_NOT_OK(stream->Init(truncate, ensure_not_flat_namespace_directory));
     return stream;
   }
 
@@ -1694,7 +1749,7 @@ class AzureFileSystem::Impl {
     // on directory marker blobs.
     // https://github.com/fsspec/adlfs/blob/32132c4094350fca2680155a5c236f2e9f991ba5/adlfs/spec.py#L855-L870
     Blobs::UploadBlockBlobFromOptions blob_options;
-    blob_options.Metadata.emplace("is_directory", "true");
+    blob_options.Metadata.emplace(kFlatNamespaceIsDirectoryMetadataKey, "true");
     block_blob_client.UploadFrom(nullptr, 0, blob_options);
   }
 
diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc
index 7f5cd247a8d35..f21876f03cc95 100644
--- a/cpp/src/arrow/filesystem/azurefs_test.cc
+++ b/cpp/src/arrow/filesystem/azurefs_test.cc
@@ -838,6 +838,41 @@ class TestAzureFileSystem : public ::testing::Test {
     AssertFileInfo(fs(), subdir3, FileType::Directory);
   }
 
+  void TestDisallowReadingOrWritingDirectoryMarkers() {
+    auto data = SetUpPreexistingData();
+    auto directory_path = data.Path("directory");
+
+    ASSERT_OK(fs()->CreateDir(directory_path));
+    ASSERT_RAISES(IOError, fs()->OpenInputFile(directory_path));
+    ASSERT_RAISES(IOError, fs()->OpenOutputStream(directory_path));
+    ASSERT_RAISES(IOError, fs()->OpenAppendStream(directory_path));
+
+    auto directory_path_with_slash = directory_path + "/";
+    ASSERT_RAISES(IOError, fs()->OpenInputFile(directory_path_with_slash));
+    ASSERT_RAISES(IOError, fs()->OpenOutputStream(directory_path_with_slash));
+    ASSERT_RAISES(IOError, fs()->OpenAppendStream(directory_path_with_slash));
+  }
+
+  void TestDisallowCreatingFileAndDirectoryWithTheSameName() {
+    auto data = SetUpPreexistingData();
+    auto path1 = data.Path("directory1");
+    ASSERT_OK(fs()->CreateDir(path1));
+    ASSERT_RAISES(IOError, fs()->OpenOutputStream(path1));
+    ASSERT_RAISES(IOError, fs()->OpenAppendStream(path1));
+    AssertFileInfo(fs(), path1, FileType::Directory);
+
+    auto path2 = data.Path("directory2");
+    ASSERT_OK(fs()->OpenOutputStream(path2));
+    // CreateDir returns OK even if there is already a file or directory at this
+    // location. Whether or not this is the desired behaviour is debatable.
+    ASSERT_OK(fs()->CreateDir(path2));
+    AssertFileInfo(fs(), path2, FileType::File);
+  }
+
+  void TestOpenOutputStreamWithMissingContainer() {
+    ASSERT_RAISES(IOError, fs()->OpenOutputStream("not-a-container/file", {}));
+  }
+
   void TestDeleteDirSuccessEmpty() {
     if (HasSubmitBatchBug()) {
       GTEST_SKIP() << kSubmitBatchBugMessage;
@@ -1665,6 +1700,19 @@ TYPED_TEST(TestAzureFileSystemOnAllScenarios, CreateDirOnMissingContainer) {
   this->TestCreateDirOnMissingContainer();
 }
 
+TYPED_TEST(TestAzureFileSystemOnAllScenarios, DisallowReadingOrWritingDirectoryMarkers) {
+  this->TestDisallowReadingOrWritingDirectoryMarkers();
+}
+
+TYPED_TEST(TestAzureFileSystemOnAllScenarios,
+           DisallowCreatingFileAndDirectoryWithTheSameName) {
+  this->TestDisallowCreatingFileAndDirectoryWithTheSameName();
+}
+
+TYPED_TEST(TestAzureFileSystemOnAllScenarios, OpenOutputStreamWithMissingContainer) {
+  this->TestOpenOutputStreamWithMissingContainer();
+}
+
 TYPED_TEST(TestAzureFileSystemOnAllScenarios, DeleteDirSuccessEmpty) {
   this->TestDeleteDirSuccessEmpty();
 }
@@ -2232,6 +2280,18 @@ TEST_F(TestAzuriteFileSystem, WriteMetadata) {
                       .Value.Metadata;
   // Defaults are overwritten and not merged.
   EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "foo")}, blob_metadata);
+
+  // Metadata can be written without writing any data.
+  ASSERT_OK_AND_ASSIGN(
+      output, fs_with_defaults->OpenAppendStream(
+                  full_path, /*metadata=*/arrow::key_value_metadata({{"bar", "baz"}})));
+  ASSERT_OK(output->Close());
+  blob_metadata = blob_service_client_->GetBlobContainerClient(data.container_name)
+                      .GetBlockBlobClient(blob_path)
+                      .GetProperties()
+                      .Value.Metadata;
+  // Defaults are overwritten and not merged.
+  EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "baz")}, blob_metadata);
 }
 
 TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) {