From 43b7f8fda6215e62a78a862c9e2e35c3c71c2e03 Mon Sep 17 00:00:00 2001 From: David Watrous <509299+dpwatrous@users.noreply.github.com> Date: Tue, 14 Nov 2023 15:01:11 -0500 Subject: [PATCH] Initial Track 2 conversion --- Java/PoolAndResourceFile/pom.xml | 27 +- .../src/main/java/PoolAndResourceFile.java | 325 +++++++++--------- 2 files changed, 172 insertions(+), 180 deletions(-) diff --git a/Java/PoolAndResourceFile/pom.xml b/Java/PoolAndResourceFile/pom.xml index 3c1374b6..99d97180 100644 --- a/Java/PoolAndResourceFile/pom.xml +++ b/Java/PoolAndResourceFile/pom.xml @@ -15,29 +15,14 @@ - com.microsoft.azure - azure-batch - 9.0.0 + com.azure + azure-compute-batch + 1.0.0-beta.1 - com.microsoft.rest - client-runtime - 1.7.12 - - - com.microsoft.azure - azure-client-runtime - 1.7.12 - - - commons-codec - commons-codec - 1.15 - - - com.microsoft.azure - azure-storage - 8.6.6 + com.azure + azure-storage-blob + 12.21.1 diff --git a/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java b/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java index 7fb9f284..a722f122 100644 --- a/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java +++ b/Java/PoolAndResourceFile/src/main/java/PoolAndResourceFile.java @@ -1,18 +1,29 @@ -import java.io.*; -import java.net.URISyntaxException; -import java.security.InvalidKeyException; +import com.azure.compute.batch.BatchClient; +import com.azure.compute.batch.BatchClientBuilder; +import com.azure.compute.batch.models.*; +import com.azure.core.credential.AzureNamedKeyCredential; +import com.azure.core.exception.HttpResponseException; +import com.azure.core.http.rest.PagedIterable; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.sas.BlobSasPermission; +import com.azure.storage.blob.sas.BlobServiceSasSignatureValues; +import com.azure.storage.blob.specialized.BlockBlobClient; +import com.azure.storage.common.StorageSharedKeyCredential; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.time.Duration; -import java.util.*; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import com.microsoft.azure.storage.*; -import com.microsoft.azure.storage.blob.*; - -import com.microsoft.azure.batch.*; -import com.microsoft.azure.batch.auth.*; -import com.microsoft.azure.batch.protocol.models.*; - public class PoolAndResourceFile { // Get Batch and storage account information from environment @@ -33,9 +44,14 @@ public class PoolAndResourceFile { static boolean CLEANUP_JOB = true; static boolean CLEANUP_POOL = true; - public static void main(String[] argv) throws Exception { - BatchClient client = BatchClient.open(new BatchSharedKeyCredentials(BATCH_URI, BATCH_ACCOUNT, BATCH_ACCESS_KEY)); - CloudBlobContainer container = createBlobContainerIfNotExists(STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_CONTAINER_NAME); + public static void main(String[] argv) { + BatchClient client = new BatchClientBuilder() + .endpoint(BATCH_URI) + .credential(new AzureNamedKeyCredential(BATCH_ACCOUNT, BATCH_ACCESS_KEY)) + .buildClient(); + + BlobContainerClient containerClient = createBlobContainerIfNotExists( + STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_CONTAINER_NAME); String userName = System.getProperty("user.name"); String poolId = userName + "-pooltest"; @@ -43,33 +59,34 @@ public static void main(String[] argv) throws Exception { new Date().toString().replaceAll("(\\.|:|\\s)", "-"); try { - CloudPool sharedPool = createPoolIfNotExists(client, poolId); + BatchPool sharedPool = createPoolIfNotExists(client, poolId); // Submit a job and wait for completion - submitJob(client, container, sharedPool.id(), jobId, TASK_COUNT); + submitJob(client, containerClient, sharedPool.getId(), jobId, TASK_COUNT); waitForTasksToComplete(client, jobId, Duration.ofMinutes(5)); System.out.println("\nTask Results"); System.out.println("------------------------------------------------------"); - List tasks = client.taskOperations().listTasks(jobId); - for (CloudTask task : tasks) { - if (task.executionInfo().failureInfo() != null) { - System.out.println("Task " + task.id() + " failed: " + task.executionInfo().failureInfo().message()); + PagedIterable tasks = client.listTasks(jobId); + for (BatchTask task : tasks) { + BatchTaskExecutionInfo execution = task.getExecutionInfo(); + + if (execution.getFailureInfo() != null) { + System.out.println("Task " + task.getId() + " failed: " + execution.getFailureInfo().getMessage()); } - String outputFileName = task.executionInfo().exitCode() == 0 ? "stdout.txt" : "stderr.txt"; - ByteArrayOutputStream stream = new ByteArrayOutputStream(); - client.fileOperations().getFileFromTask(jobId, task.id(), outputFileName, stream); - String fileContent = stream.toString("UTF-8"); + String outputFileName = execution.getExitCode() == 0 ? "stdout.txt" : "stderr.txt"; + String fileContent = client.getTaskFile(jobId, task.getId(), outputFileName).toString(); - System.out.println("\nTask " + task.id() + " output (" + outputFileName + "):"); + System.out.println("\nTask " + task.getId() + " output (" + outputFileName + "):"); System.out.println(fileContent); } System.out.println("------------------------------------------------------\n"); - } catch (BatchErrorException err) { - printBatchException(err); + // TODO: How do we replace BatchErrorException? + // } catch (BatchErrorException err) { + // printBatchException(err); } catch (Exception ex) { ex.printStackTrace(); } finally { @@ -77,22 +94,22 @@ public static void main(String[] argv) throws Exception { if (CLEANUP_JOB) { try { System.out.println("Deleting job " + jobId); - client.jobOperations().deleteJob(jobId); - } catch (BatchErrorException err) { + client.deleteJob(jobId); + } catch (HttpResponseException err) { printBatchException(err); } } if (CLEANUP_POOL) { try { System.out.println("Deleting pool " + poolId); - client.poolOperations().deletePool(poolId); - } catch (BatchErrorException err) { + client.deletePool(poolId); + } catch (HttpResponseException err) { printBatchException(err); } } if (CLEANUP_STORAGE_CONTAINER) { - System.out.println("Deleting storage container " + container.getName()); - container.deleteIfExists(); + System.out.println("Deleting storage container " + containerClient.getBlobContainerName()); + containerClient.deleteIfExists(); } } @@ -103,53 +120,49 @@ public static void main(String[] argv) throws Exception { /** * Create a pool if one doesn't already exist with the given ID * - * @param client The Batch client - * @param poolId The ID of the pool to create or look up - * - * @return A newly created or existing pool + * @param client The Batch client + * @param poolId The ID of the pool to create or look up + * @return A newly created or existing pool */ - private static CloudPool createPoolIfNotExists(BatchClient client, String poolId) - throws BatchErrorException, IllegalArgumentException, IOException, InterruptedException, TimeoutException { - // Create a pool with 1 A1 VM - String osPublisher = "OpenLogic"; - String osOffer = "CentOS"; - String poolVMSize = "STANDARD_A1"; - int poolVMCount = 1; + private static BatchPool createPoolIfNotExists(BatchClient client, String poolId) + throws InterruptedException, TimeoutException { + // Create a pool with a single node + String osPublisher = "canonical"; + String osOffer = "0001-com-ubuntu-server-jammy"; + String vmSize = "Standard_A1_v2"; + int targetNodeCount = 1; Duration poolSteadyTimeout = Duration.ofMinutes(5); - Duration vmReadyTimeout = Duration.ofMinutes(20); + Duration nodeReadyTimeout = Duration.ofMinutes(20); // If the pool exists and is active (not being deleted), resize it - if (client.poolOperations().existsPool(poolId) && client.poolOperations().getPool(poolId).state().equals(PoolState.ACTIVE)) { - System.out.println("Pool " + poolId + " already exists: Resizing to " + poolVMCount + " dedicated node(s)"); - client.poolOperations().resizePool(poolId, NODE_COUNT, 0); + if (client.poolExists(poolId) && client.getPool(poolId).getState().equals(BatchPoolState.ACTIVE)) { + System.out.println("Pool " + poolId + " already exists: Resizing to " + targetNodeCount + " dedicated node(s)"); + client.resizePool(poolId, new BatchPoolResizeParameters().setTargetDedicatedNodes(NODE_COUNT)); } else { - System.out.println("Creating pool " + poolId + " with " + poolVMCount + " dedicated node(s)"); - - // See detail of creating IaaS pool at - // https://blogs.technet.microsoft.com/windowshpc/2016/03/29/introducing-linux-support-on-azure-batch/ - // Get the sku image reference - List skus = client.accountOperations().listSupportedImages(); - String skuId = null; - ImageReference imageRef = null; - - for (ImageInformation sku : skus) { - if (sku.osType() == OSType.LINUX) { - if (sku.verificationType() == VerificationType.VERIFIED) { - if (sku.imageReference().publisher().equalsIgnoreCase(osPublisher) - && sku.imageReference().offer().equalsIgnoreCase(osOffer)) { - imageRef = sku.imageReference(); - skuId = sku.nodeAgentSKUId(); - break; - } - } + System.out.println("Creating pool " + poolId + " with " + targetNodeCount + " dedicated node(s)"); + + String nodeAgentSku = null; + ImageReference image = null; + for (ImageInfo sku : client.listSupportedImages()) { + image = sku.getImageReference(); + nodeAgentSku = sku.getNodeAgentSkuId(); + if (sku.getOsType() == OSType.LINUX + && sku.getVerificationType().equals(ImageVerificationType.VERIFIED) + && image.getPublisher().equalsIgnoreCase(osPublisher) + && image.getOffer().equalsIgnoreCase(osOffer)) { + // Found a matching verified image + break; } } - // Use IaaS VM with Linux - VirtualMachineConfiguration configuration = new VirtualMachineConfiguration(); - configuration.withNodeAgentSKUId(skuId).withImageReference(imageRef); + if (nodeAgentSku == null || image == null) { + throw new IllegalArgumentException( + String.format("Unable to find a verified image with publisher '%s' and offer '%s'", osPublisher, osOffer)); + } - client.poolOperations().createPool(poolId, poolVMSize, configuration, poolVMCount); + client.createPool(new BatchPoolCreateParameters(poolId, vmSize) + .setVirtualMachineConfiguration(new VirtualMachineConfiguration(image, nodeAgentSku)) + .setTargetDedicatedNodes(targetNodeCount)); } long startTime = System.currentTimeMillis(); @@ -159,8 +172,8 @@ private static CloudPool createPoolIfNotExists(BatchClient client, String poolId // Wait for the VM to be allocated System.out.print("Waiting for pool to resize."); while (elapsedTime < poolSteadyTimeout.toMillis()) { - CloudPool pool = client.poolOperations().getPool(poolId); - if (pool.allocationState() == AllocationState.STEADY) { + BatchPool pool = client.getPool(poolId); + if (pool.getAllocationState() == AllocationState.STEADY) { steady = true; break; } @@ -179,17 +192,21 @@ private static CloudPool createPoolIfNotExists(BatchClient client, String poolId // The following code is just an example of how to poll for the VM state startTime = System.currentTimeMillis(); elapsedTime = 0L; - boolean hasIdleVM = false; - - // Wait for at least 1 VM to reach the IDLE state - System.out.print("Waiting for VMs to start."); - while (elapsedTime < vmReadyTimeout.toMillis()) { - List nodeCollection = client.computeNodeOperations().listComputeNodes(poolId, - new DetailLevel.Builder().withSelectClause("id, state").withFilterClause("state eq 'idle'") - .build()); - if (!nodeCollection.isEmpty()) { - hasIdleVM = true; - break; + boolean hasIdleNode = false; + + // Wait for at least 1 node to reach the idle state + System.out.print("Waiting for nodes to start."); + while (elapsedTime < nodeReadyTimeout.toMillis()) { + PagedIterable nodes = client.listNodes(poolId, new ListBatchNodesOptions() + .setSelect(Arrays.asList("id", "state")) + .setFilter("state eq 'idle'") + .setMaxresults(1)); + + for (BatchNode node : nodes) { + if (node != null) { + hasIdleNode = true; + break; + } } System.out.print("."); @@ -198,124 +215,113 @@ private static CloudPool createPoolIfNotExists(BatchClient client, String poolId } System.out.println(); - if (!hasIdleVM) { + if (!hasIdleNode) { throw new TimeoutException("The node did not reach an IDLE state in the allotted time"); } - return client.poolOperations().getPool(poolId); + return client.getPool(poolId); } /** * Create blob container in order to upload file - * - * @param storageAccountName The name of the storage account to create or look up - * @param storageAccountKey An SAS key for accessing the storage account * - * @return A newly created or existing storage container + * @param storageAccountName The name of the storage account to create or look up + * @param storageAccountKey An SAS key for accessing the storage account + * @return A newly created or existing storage container */ - private static CloudBlobContainer createBlobContainerIfNotExists(String storageAccountName, String storageAccountKey, String containerName) - throws URISyntaxException, StorageException { + private static BlobContainerClient createBlobContainerIfNotExists(String storageAccountName, String storageAccountKey, String containerName) { System.out.println("Creating storage container " + containerName); - StorageCredentials credentials = new StorageCredentialsAccountAndKey(storageAccountName, storageAccountKey); - CloudBlobClient blobClient = new CloudStorageAccount(credentials, true).createCloudBlobClient(); - CloudBlobContainer container = blobClient.getContainerReference(containerName); - container.createIfNotExists(); + BlobServiceClient blobClient = new BlobServiceClientBuilder() + .endpoint(String.format("https://%s.blob.core.windows.net/", storageAccountName)) + .credential(new StorageSharedKeyCredential(storageAccountName, storageAccountKey)) + .buildClient(); - return container; + blobClient.createBlobContainerIfNotExists(containerName); + + return blobClient.getBlobContainerClient(containerName); } /** * Upload a file to a blob container and return an SAS key * - * @param container The container to upload to - * @param source The local file to upload - * + * @param containerClient The blob container client to use + * @param source The local file to upload * @return An SAS key for the uploaded file */ - private static String uploadFileToCloud(CloudBlobContainer container, File source) - throws URISyntaxException, IOException, InvalidKeyException, StorageException { - CloudBlockBlob blob = container.getBlockBlobReference(source.getName()); - blob.upload(new FileInputStream(source), source.length()); - - // Set SAS expiry time to 1 day from now - SharedAccessBlobPolicy policy = new SharedAccessBlobPolicy(); - EnumSet perEnumSet = EnumSet.of(SharedAccessBlobPermissions.READ); - policy.setPermissions(perEnumSet); - Calendar cal = Calendar.getInstance(); - cal.setTime(new Date()); - cal.add(Calendar.DATE, 1); - policy.setSharedAccessExpiryTime(cal.getTime()); - - // Create SAS key - String sas = blob.generateSharedAccessSignature(policy, null); - - return blob.getUri() + "?" + sas; + private static String uploadFileToStorage(BlobContainerClient containerClient, File source) throws IOException { + BlockBlobClient blobClient = containerClient.getBlobClient(source.getName()).getBlockBlobClient(); + blobClient.upload(Files.newInputStream(source.toPath()), source.length()); + + // Create SAS with expiry time of 1 day + String sas = blobClient.generateSas(new BlobServiceSasSignatureValues( + OffsetDateTime.now().plusDays(1), + new BlobSasPermission().setReadPermission(true) + )); + + return blobClient.getBlobUrl() + "?" + sas; } /** * Create a job and add some tasks - * - * @param client The Batch client - * @param container A blob container to upload resource files - * @param poolId The ID of the pool to submit a job - * @param jobId A unique ID for the new job - * @param taskCount How many tasks to add + * + * @param client The Batch client + * @param containerClient A blob container to upload resource files + * @param poolId The ID of the pool to submit a job + * @param jobId A unique ID for the new job + * @param taskCount How many tasks to add */ - private static void submitJob(BatchClient client, CloudBlobContainer container, String poolId, - String jobId, int taskCount) - throws BatchErrorException, IOException, StorageException, InvalidKeyException, InterruptedException, URISyntaxException { + private static void submitJob(BatchClient client, BlobContainerClient containerClient, String poolId, + String jobId, int taskCount) throws IOException, InterruptedException { System.out.println("Submitting job " + jobId + " with " + taskCount + " tasks"); // Create job - PoolInformation poolInfo = new PoolInformation(); - poolInfo.withPoolId(poolId); - client.jobOperations().createJob(jobId, poolInfo); + BatchPoolInfo poolInfo = new BatchPoolInfo(); + poolInfo.setPoolId(poolId); + client.createJob(new BatchJobCreateParameters(jobId, poolInfo)); // Upload a resource file and make it available in a "resources" subdirectory on nodes String fileName = "test.txt"; String localPath = "./" + fileName; String remotePath = "resources/" + fileName; - String signedUrl = uploadFileToCloud(container, new File(localPath)); + String signedUrl = uploadFileToStorage(containerClient, new File(localPath)); List files = new ArrayList<>(); files.add(new ResourceFile() - .withHttpUrl(signedUrl) - .withFilePath(remotePath)); + .setHttpUrl(signedUrl) + .setFilePath(remotePath)); // Create tasks - List tasks = new ArrayList<>(); + List tasks = new ArrayList<>(); for (int i = 0; i < taskCount; i++) { - tasks.add(new TaskAddParameter() - .withId("mytask" + i) - .withCommandLine("cat " + remotePath) - .withResourceFiles(files)); + tasks.add(new BatchTaskCreateParameters("mytask" + i, "cat " + remotePath) + .setResourceFiles(files)); } // Add the tasks to the job - client.taskOperations().createTasks(jobId, tasks); + client.createTasks(jobId, tasks); } /** * Wait for all tasks in a given job to be completed, or throw an exception on timeout - * - * @param client The Batch client - * @param jobId The ID of the job to poll for completion. - * @param timeout How long to wait for the job to complete before giving up + * + * @param client The Batch client + * @param jobId The ID of the job to poll for completion. + * @param timeout How long to wait for the job to complete before giving up */ private static void waitForTasksToComplete(BatchClient client, String jobId, Duration timeout) - throws BatchErrorException, IOException, InterruptedException, TimeoutException { + throws InterruptedException, TimeoutException { long startTime = System.currentTimeMillis(); long elapsedTime = 0L; System.out.print("Waiting for tasks to complete (Timeout: " + timeout.getSeconds() / 60 + "m)"); while (elapsedTime < timeout.toMillis()) { - List taskCollection = client.taskOperations().listTasks(jobId, - new DetailLevel.Builder().withSelectClause("id, state").build()); + PagedIterable taskCollection = client.listTasks(jobId, + new ListBatchTasksOptions().setSelect(Arrays.asList("id", "state"))); boolean allComplete = true; - for (CloudTask task : taskCollection) { - if (task.state() != TaskState.COMPLETED) { + for (BatchTask task : taskCollection) { + if (task.getState() != BatchTaskState.COMPLETED) { allComplete = false; break; } @@ -338,17 +344,18 @@ private static void waitForTasksToComplete(BatchClient client, String jobId, Dur throw new TimeoutException("Task did not complete within the specified timeout"); } - private static void printBatchException(BatchErrorException err) { - System.out.printf("BatchError %s%n", err.toString()); - if (err.body() != null) { - System.out.printf("BatchError code = %s, message = %s%n", err.body().code(), - err.body().message().value()); - if (err.body().values() != null) { - for (BatchErrorDetail detail : err.body().values()) { - System.out.printf("Detail %s=%s%n", detail.key(), detail.value()); - } - } - } + private static void printBatchException(HttpResponseException err) { + // TODO: How do we get error details? + System.out.printf("HTTP Response error %s%n", err.toString()); +// if (err.body() != null) { +// System.out.printf("BatchError code = %s, message = %s%n", err.body().code(), +// err.body().message().value()); +// if (err.body().values() != null) { +// for (BatchErrorDetail detail : err.body().values()) { +// System.out.printf("Detail %s=%s%n", detail.key(), detail.value()); +// } +// } +// } } }