From 9b5ba986707202cd1727df0603b768d0a3764cbb Mon Sep 17 00:00:00 2001 From: Tetsuro Sano Date: Wed, 4 Dec 2024 22:51:27 +0900 Subject: [PATCH] Migrate for Embulk v0.1x --- .github/workflows/gem-push.yml | 6 +- build.gradle | 140 +++++------------- gradle.lockfile | 31 ++++ gradle/wrapper/gradle-wrapper.properties | 2 +- lib/embulk/input/cloudwatch_logs.rb | 3 - .../AbstractCloudwatchLogsInputPlugin.java | 95 ++++++------ .../CloudwatchLogsInputPlugin.java | 10 +- .../cloudwatch_logs/aws/AwsCredentials.java | 3 +- .../aws/AwsCredentialsConfig.java | 2 +- .../aws/AwsCredentialsTask.java | 6 +- .../cloudwatch_logs/TestAwsCredentials.java | 61 ++++---- .../TestCloudwatchLogsInputPlugin.java | 67 ++++----- .../input/cloudwatch_logs/TestHelpers.java | 20 +-- 13 files changed, 201 insertions(+), 245 deletions(-) create mode 100644 gradle.lockfile delete mode 100644 lib/embulk/input/cloudwatch_logs.rb diff --git a/.github/workflows/gem-push.yml b/.github/workflows/gem-push.yml index 5345b86..d51f9e8 100644 --- a/.github/workflows/gem-push.yml +++ b/.github/workflows/gem-push.yml @@ -14,14 +14,14 @@ jobs: packages: write contents: read steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up Ruby 2.7 uses: ruby/setup-ruby@v1 with: ruby-version: 2.7 - name: push gem - uses: trocco-io/push-gem-to-gpr-action@v1 + uses: trocco-io/push-gem-to-gpr-action@v2 with: language: java - gem-path: "./pkg/*.gem" + gem-path: "./build/gems/*.gem" github-token: "${{ secrets.GITHUB_TOKEN }}" diff --git a/build.gradle b/build.gradle index fbda876..0fd8ad7 100644 --- a/build.gradle +++ b/build.gradle @@ -1,134 +1,64 @@ plugins { - id "com.jfrog.bintray" version "1.1" - id "com.github.jruby-gradle.base" version "1.5.0" id "java" id "checkstyle" - id "com.palantir.git-version" version "3.0.0" + id "org.embulk.embulk-plugins" version "0.7.0" + id "com.palantir.git-version" version "3.1.0" } -import com.github.jrubygradle.JRubyExec repositories { mavenCentral() - jcenter() -} -configurations { - provided } +group = "trocco-io" version = { def vd = versionDetails() - if (vd.commitDistance == 0 && vd.lastTag ==~ /^v?[0-9]+\.[0-9]+\.[0-9]+([.-][a-zA-Z0-9.-]+)?/) { - vd.lastTag.replaceFirst(/^v/, "") + if (vd.commitDistance == 0 && vd.lastTag ==~ /^[0-9]+\.[0-9]+\.[0-9]+([.-][.a-zA-Z0-9-]+)?/) { + vd.lastTag } else { "0.0.0.${vd.gitHash}" } }() +description = "Loads records from Cloudwatch Logs." -sourceCompatibility = 1.8 -targetCompatibility = 1.8 - -dependencies { - compile "org.embulk:embulk-core:0.10.1" - provided "org.embulk:embulk-core:0.10.1" - implementation "com.google.guava:guava:28.2-jre" - compile group: 'com.amazonaws', name: 'aws-java-sdk-logs', version: '1.11.749' - compile group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.11.749' - // compile "YOUR_JAR_DEPENDENCY_GROUP:YOUR_JAR_DEPENDENCY_MODULE:YOUR_JAR_DEPENDENCY_VERSION" - testCompile "junit:junit:4.+" - testCompile "org.mockito:mockito-core:1.+" - testCompile "org.embulk:embulk-core:0.10.1:tests" - testCompile "org.embulk:embulk-standards:0.10.1" - testCompile "org.embulk:embulk-junit4:0.10.1" - testCompile "org.embulk:embulk-deps-buffer:0.10.1" - testCompile "org.embulk:embulk-deps-config:0.10.1" +java { + sourceCompatibility = JavaVersion.VERSION_1_8 + targetCompatibility = JavaVersion.VERSION_1_8 } -// add tests/javadoc/source jar tasks as artifacts to be released -task testsJar(type: Jar, dependsOn: classes) { - classifier = 'tests' - from sourceSets.test.output -} -task sourcesJar(type: Jar, dependsOn: classes) { - classifier = 'sources' - from sourceSets.main.allSource -} -task javadocJar(type: Jar, dependsOn: javadoc) { - classifier = 'javadoc' - from javadoc.destinationDir -} - -task classpath(type: Copy, dependsOn: ["jar"]) { - doFirst { file("classpath").deleteDir() } - from (configurations.runtime - configurations.provided + files(jar.archivePath)) - into "classpath" +configurations { + compileClasspath.resolutionStrategy.activateDependencyLocking() + runtimeClasspath.resolutionStrategy.activateDependencyLocking() } -clean { delete "classpath" } -checkstyle { - configFile = file("${project.rootDir}/config/checkstyle/checkstyle.xml") - toolVersion = '6.14.1' -} -checkstyleMain { - configFile = file("${project.rootDir}/config/checkstyle/default.xml") - ignoreFailures = true -} -checkstyleTest { - configFile = file("${project.rootDir}/config/checkstyle/default.xml") - ignoreFailures = true -} -task checkstyle(type: Checkstyle) { - classpath = sourceSets.main.output + sourceSets.test.output - source = sourceSets.main.allJava + sourceSets.test.allJava -} +dependencies { + compileOnly "org.embulk:embulk-spi:0.11" -task gem(type: JRubyExec, dependsOn: ["gemspec", "classpath"]) { - jrubyArgs "-S" - script "gem" - scriptArgs "build", "${project.name}.gemspec" - doLast { ant.move(file: "${project.name}-${project.version}.gem", todir: "pkg") } -} + implementation "org.embulk:embulk-util-config:0.3.4" + implementation "com.google.guava:guava:28.2-jre" + implementation "com.amazonaws:aws-java-sdk-logs:1.11.749" + implementation "com.amazonaws:aws-java-sdk-sts:1.11.749" -task gemPush(type: JRubyExec, dependsOn: ["gem"]) { - jrubyArgs "-S" - script "gem" - scriptArgs "push", "pkg/${project.name}-${project.version}.gem" + testImplementation "junit:junit:4.+" + testImplementation "org.mockito:mockito-core:1.+" + testImplementation "org.embulk:embulk-core:0.11.5" + testImplementation "org.embulk:embulk-deps:0.11.5" + testImplementation "org.embulk:embulk-junit4:0.11.5" } -task "package"(dependsOn: ["gemspec", "classpath"]) { - doLast { - println "> Build succeeded." - println "> You can run embulk with '-L ${file(".").absolutePath}' argument." - } +tasks.withType(JavaCompile).configureEach { + options.compilerArgs << "-Xlint:all" << "-Xlint:-serial" } -artifacts { - archives testsJar, sourcesJar, javadocJar +embulkPlugin { + mainClass = "org.embulk.input.cloudwatch_logs.CloudwatchLogsInputPlugin" + category = "input" + type = "cloudwatch_logs" } -task gemspec { - ext.gemspecFile = file("${project.name}.gemspec") - inputs.file "build.gradle" - outputs.file gemspecFile - doLast { gemspecFile.write($/ -Gem::Specification.new do |spec| - spec.name = "${project.name}" - spec.version = "${project.version}" - spec.authors = ["Hiroshi Hatake"] - spec.summary = %[Cloudwatch Logs input plugin for Embulk] - spec.description = %[Loads records from Cloudwatch Logs.] - spec.email = ["cosmo0920.wp@gmail.com"] - spec.licenses = ["MIT"] - spec.homepage = "https://github.com/cosmo0920/embulk-input-cloudwatch_logs" - - spec.files = `git ls-files`.split("\n") + Dir["classpath/*.jar"] - spec.test_files = spec.files.grep(%r"^(test|spec)/") - spec.require_paths = ["lib"] - - #spec.add_dependency 'YOUR_GEM_DEPENDENCY', ['~> YOUR_GEM_DEPENDENCY_VERSION'] - spec.add_development_dependency 'bundler', ['~> 1.0'] - spec.add_development_dependency 'rake', ['~> 12.0'] -end -/$) - } +gem { + authors = [ "Hiroshi Hatake" ] + email = [ "cosmo0920.wp@gmail.com" ] + summary = "Cloudwatch Logs input plugin for Embulk" + homepage = "https://github.com/trocco-io/embulk-input-cloudwatch_logs" + licenses = [ "MIT" ] } -clean { delete "${project.name}.gemspec" } diff --git a/gradle.lockfile b/gradle.lockfile new file mode 100644 index 0000000..941f7f8 --- /dev/null +++ b/gradle.lockfile @@ -0,0 +1,31 @@ +# This is a Gradle generated file for dependency locking. +# Manual edits can break the build and are not advised. +# This file is expected to be part of source control. +com.amazonaws:aws-java-sdk-core:1.11.749=compileClasspath,runtimeClasspath +com.amazonaws:aws-java-sdk-logs:1.11.749=compileClasspath,runtimeClasspath +com.amazonaws:aws-java-sdk-sts:1.11.749=compileClasspath,runtimeClasspath +com.amazonaws:jmespath-java:1.11.749=compileClasspath,runtimeClasspath +com.fasterxml.jackson.core:jackson-annotations:2.6.7=compileClasspath,runtimeClasspath +com.fasterxml.jackson.core:jackson-core:2.6.7=compileClasspath,runtimeClasspath +com.fasterxml.jackson.core:jackson-databind:2.6.7.5=compileClasspath,runtimeClasspath +com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.6.7=compileClasspath,runtimeClasspath +com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7=compileClasspath,runtimeClasspath +com.google.code.findbugs:jsr305:3.0.2=compileClasspath,runtimeClasspath +com.google.errorprone:error_prone_annotations:2.3.4=compileClasspath,runtimeClasspath +com.google.guava:failureaccess:1.0.1=compileClasspath,runtimeClasspath +com.google.guava:guava:28.2-jre=compileClasspath,runtimeClasspath +com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava=compileClasspath,runtimeClasspath +com.google.j2objc:j2objc-annotations:1.3=compileClasspath,runtimeClasspath +commons-codec:commons-codec:1.11=compileClasspath,runtimeClasspath +commons-logging:commons-logging:1.2=compileClasspath,runtimeClasspath +javax.validation:validation-api:1.1.0.Final=compileClasspath,runtimeClasspath +joda-time:joda-time:2.8.1=compileClasspath,runtimeClasspath +org.apache.httpcomponents:httpclient:4.5.9=compileClasspath,runtimeClasspath +org.apache.httpcomponents:httpcore:4.4.11=compileClasspath,runtimeClasspath +org.checkerframework:checker-qual:2.10.0=compileClasspath,runtimeClasspath +org.embulk:embulk-spi:0.11=compileClasspath +org.embulk:embulk-util-config:0.3.4=compileClasspath,runtimeClasspath +org.msgpack:msgpack-core:0.8.24=compileClasspath +org.slf4j:slf4j-api:2.0.7=compileClasspath +software.amazon.ion:ion-java:1.0.2=compileClasspath,runtimeClasspath +empty= diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index b7c8c5d..48c0a02 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/lib/embulk/input/cloudwatch_logs.rb b/lib/embulk/input/cloudwatch_logs.rb deleted file mode 100644 index 2c5b0da..0000000 --- a/lib/embulk/input/cloudwatch_logs.rb +++ /dev/null @@ -1,3 +0,0 @@ -Embulk::JavaPlugin.register_input( - "cloudwatch_logs", "org.embulk.input.cloudwatch_logs.CloudwatchLogsInputPlugin", - File.expand_path('../../../../classpath', __FILE__)) diff --git a/src/main/java/org/embulk/input/cloudwatch_logs/AbstractCloudwatchLogsInputPlugin.java b/src/main/java/org/embulk/input/cloudwatch_logs/AbstractCloudwatchLogsInputPlugin.java index 99548aa..86f0c01 100644 --- a/src/main/java/org/embulk/input/cloudwatch_logs/AbstractCloudwatchLogsInputPlugin.java +++ b/src/main/java/org/embulk/input/cloudwatch_logs/AbstractCloudwatchLogsInputPlugin.java @@ -1,35 +1,9 @@ package org.embulk.input.cloudwatch_logs; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; - -import org.embulk.config.Config; -import org.embulk.config.ConfigDefault; - -import org.embulk.config.ConfigDiff; -import org.embulk.config.ConfigException; -import org.embulk.config.ConfigSource; -import org.embulk.config.Task; -import org.embulk.config.TaskReport; -import org.embulk.config.TaskSource; -import org.embulk.spi.Exec; -import org.embulk.spi.InputPlugin; -import org.embulk.spi.PageBuilder; -import org.embulk.spi.PageOutput; -import org.embulk.spi.Schema; -import org.embulk.spi.time.Timestamp; -import org.embulk.spi.type.Types; - import com.amazonaws.AmazonServiceException; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.retry.PredefinedRetryPolicies; - import com.amazonaws.services.logs.AWSLogs; import com.amazonaws.services.logs.AWSLogsClientBuilder; import com.amazonaws.services.logs.model.DescribeLogStreamsRequest; @@ -38,14 +12,38 @@ import com.amazonaws.services.logs.model.GetLogEventsResult; import com.amazonaws.services.logs.model.LogStream; import com.amazonaws.services.logs.model.OutputLogEvent; - +import com.google.common.annotations.VisibleForTesting; +import org.embulk.config.ConfigDiff; +import org.embulk.config.ConfigException; +import org.embulk.config.ConfigSource; +import org.embulk.config.TaskReport; +import org.embulk.config.TaskSource; import org.embulk.input.cloudwatch_logs.aws.AwsCredentials; import org.embulk.input.cloudwatch_logs.aws.AwsCredentialsTask; import org.embulk.input.cloudwatch_logs.utils.DateUtils; +import org.embulk.spi.Exec; +import org.embulk.spi.InputPlugin; +import org.embulk.spi.PageBuilder; +import org.embulk.spi.PageOutput; +import org.embulk.spi.Schema; +import org.embulk.spi.type.Types; +import org.embulk.util.config.Config; +import org.embulk.util.config.ConfigDefault; +import org.embulk.util.config.ConfigMapper; +import org.embulk.util.config.ConfigMapperFactory; +import org.embulk.util.config.Task; +import org.embulk.util.config.TaskMapper; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Optional; public abstract class AbstractCloudwatchLogsInputPlugin implements InputPlugin { + private static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder().addDefaultModules().build(); private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; public interface PluginTask @@ -85,22 +83,23 @@ public interface PluginTask public ConfigDiff transaction(ConfigSource config, InputPlugin.Control control) { - PluginTask task = config.loadConfig(getTaskClass()); + ConfigMapper configMapper = CONFIG_MAPPER_FACTORY.createConfigMapper(); + PluginTask task = configMapper.map(config, getTaskClass()); Schema schema = new Schema.Builder() .add("timestamp", Types.TIMESTAMP) .add(task.getColumnName(), Types.STRING) .build(); int taskCount = 1; // number of run() method calls - String time_range_format = DEFAULT_DATE_FORMAT; + String timeRangeFormat = DEFAULT_DATE_FORMAT; if (task.getTimeRangeFormat().isPresent()) { - time_range_format = task.getTimeRangeFormat().get(); + timeRangeFormat = task.getTimeRangeFormat().get(); } if (task.getStartTime().isPresent() && task.getEndTime().isPresent()) { Date startTime = DateUtils.parseDateStr(task.getStartTime().get(), - Collections.singletonList(time_range_format)); + Collections.singletonList(timeRangeFormat)); Date endTime = DateUtils.parseDateStr(task.getEndTime().get(), - Collections.singletonList(time_range_format)); + Collections.singletonList(timeRangeFormat)); if (endTime.before(startTime)) { throw new ConfigException(String.format("endTime(%s) must not be earlier than startTime(%s).", task.getEndTime().get(), @@ -108,7 +107,7 @@ public ConfigDiff transaction(ConfigSource config, } } - return resume(task.dump(), schema, taskCount, control); + return resume(task.toTaskSource(), schema, taskCount, control); } @Override @@ -117,7 +116,7 @@ public ConfigDiff resume(TaskSource taskSource, InputPlugin.Control control) { control.run(taskSource, schema, taskCount); - return Exec.newConfigDiff(); + return CONFIG_MAPPER_FACTORY.newConfigDiff(); } @Override @@ -133,12 +132,13 @@ public TaskReport run(TaskSource taskSource, Schema schema, int taskIndex, PageOutput output) { - PluginTask task = taskSource.loadTask(getTaskClass()); + TaskMapper taskMapper = CONFIG_MAPPER_FACTORY.createTaskMapper(); + PluginTask task = taskMapper.map(taskSource, getTaskClass()); AWSLogs client = newLogsClient(task); CloudWatchLogsDrainer drainer = new CloudWatchLogsDrainer(task, client); if (task.getUseLogStreamNamePrefix()) { - List defaultLogStream = new ArrayList(); + List defaultLogStream = new ArrayList<>(); List logStreams = drainer.describeLogStreams(defaultLogStream, null); try (final PageBuilder pageBuilder = getPageBuilder(schema, output)) { for (LogStream stream : logStreams) { @@ -161,9 +161,10 @@ public TaskReport run(TaskSource taskSource, } } - return Exec.newTaskReport(); + return CONFIG_MAPPER_FACTORY.newTaskReport(); } + @SuppressWarnings("deprecation") // TODO: For compatibility with Embulk v0.9 private void addRecords(CloudWatchLogsDrainer drainer, PageBuilder pageBuilder, String logStreamName) { String nextToken = null; @@ -171,7 +172,8 @@ private void addRecords(CloudWatchLogsDrainer drainer, PageBuilder pageBuilder, GetLogEventsResult result = drainer.getEvents(logStreamName, nextToken); List events = result.getEvents(); for (OutputLogEvent event : events) { - pageBuilder.setTimestamp(0, Timestamp.ofEpochMilli(event.getTimestamp())); + // TODO: Use Instant instead of Timestamp + pageBuilder.setTimestamp(0, org.embulk.spi.time.Timestamp.ofEpochMilli(event.getTimestamp())); pageBuilder.setString(1, event.getMessage()); pageBuilder.addRecord(); @@ -231,13 +233,14 @@ protected ClientConfiguration getClientConfiguration(PluginTask task) @Override public ConfigDiff guess(ConfigSource config) { - return Exec.newConfigDiff(); + return CONFIG_MAPPER_FACTORY.newConfigDiff(); } @VisibleForTesting + @SuppressWarnings("deprecation") // TODO: For compatibility with Embulk v0.9 public PageBuilder getPageBuilder(final Schema schema, final PageOutput output) { - return new PageBuilder(Exec.getBufferAllocator(), schema, output); + return new PageBuilder(Exec.getBufferAllocator(), schema, output); // TODO: Use Exec#getPageBuilder } @VisibleForTesting @@ -260,18 +263,18 @@ private GetLogEventsResult getEvents(String logStreamName, String nextToken) .withLogGroupName(logGroupName) .withLogStreamName(logStreamName) .withStartFromHead(true); - String time_range_format = DEFAULT_DATE_FORMAT; + String timeRangeFormat = DEFAULT_DATE_FORMAT; if (task.getTimeRangeFormat().isPresent()) { - time_range_format = task.getTimeRangeFormat().get(); + timeRangeFormat = task.getTimeRangeFormat().get(); } if (task.getStartTime().isPresent()) { String startTimeStr = task.getStartTime().get(); - Date startTime = DateUtils.parseDateStr(startTimeStr, Collections.singletonList(time_range_format)); + Date startTime = DateUtils.parseDateStr(startTimeStr, Collections.singletonList(timeRangeFormat)); request.setStartTime(startTime.getTime()); } if (task.getEndTime().isPresent()) { String endTimeStr = task.getEndTime().get(); - Date endTime = DateUtils.parseDateStr(endTimeStr, Collections.singletonList(time_range_format)); + Date endTime = DateUtils.parseDateStr(endTimeStr, Collections.singletonList(timeRangeFormat)); request.setEndTime(endTime.getTime()); } if (nextToken != null) { @@ -309,9 +312,7 @@ private List describeLogStreams(List logStreams, String ne DescribeLogStreamsResult response = client.describeLogStreams(request); if (!logStreams.isEmpty()) { - for (LogStream stream : response.getLogStreams()) { - logStreams.add(stream); - } + logStreams.addAll(response.getLogStreams()); } else { logStreams = response.getLogStreams(); diff --git a/src/main/java/org/embulk/input/cloudwatch_logs/CloudwatchLogsInputPlugin.java b/src/main/java/org/embulk/input/cloudwatch_logs/CloudwatchLogsInputPlugin.java index 874ea3a..13bc4d2 100644 --- a/src/main/java/org/embulk/input/cloudwatch_logs/CloudwatchLogsInputPlugin.java +++ b/src/main/java/org/embulk/input/cloudwatch_logs/CloudwatchLogsInputPlugin.java @@ -1,12 +1,12 @@ package org.embulk.input.cloudwatch_logs; -import com.amazonaws.services.logs.AWSLogsClientBuilder; import com.amazonaws.services.logs.AWSLogs; -import com.google.common.base.Optional; - -import org.embulk.config.Config; -import org.embulk.config.ConfigDefault; +import com.amazonaws.services.logs.AWSLogsClientBuilder; import org.embulk.config.ConfigException; +import org.embulk.util.config.Config; +import org.embulk.util.config.ConfigDefault; + +import java.util.Optional; public class CloudwatchLogsInputPlugin extends AbstractCloudwatchLogsInputPlugin diff --git a/src/main/java/org/embulk/input/cloudwatch_logs/aws/AwsCredentials.java b/src/main/java/org/embulk/input/cloudwatch_logs/aws/AwsCredentials.java index 84a80d6..4b1f1a4 100644 --- a/src/main/java/org/embulk/input/cloudwatch_logs/aws/AwsCredentials.java +++ b/src/main/java/org/embulk/input/cloudwatch_logs/aws/AwsCredentials.java @@ -25,7 +25,6 @@ private AwsCredentials() public static AWSCredentialsProvider getAWSCredentialsProvider(AwsCredentialsConfig task) { - String authenticationMethodOption = "authentication_method"; String awsSessionTokenOption = "aws_session_token"; String awsAccessKeyIdOption = "aws_access_key_id"; String awsSecretAccessKeyOption = "aws_secret_access_key"; @@ -66,7 +65,7 @@ public void refresh() invalid(task.getAwsProfileFile(), awsProfileFileOption); invalid(task.getAwsProfileName(), awsProfileNameOption); - return new InstanceProfileCredentialsProvider(); + return InstanceProfileCredentialsProvider.getInstance(); case "profile": { diff --git a/src/main/java/org/embulk/input/cloudwatch_logs/aws/AwsCredentialsConfig.java b/src/main/java/org/embulk/input/cloudwatch_logs/aws/AwsCredentialsConfig.java index 14fb20a..a566664 100644 --- a/src/main/java/org/embulk/input/cloudwatch_logs/aws/AwsCredentialsConfig.java +++ b/src/main/java/org/embulk/input/cloudwatch_logs/aws/AwsCredentialsConfig.java @@ -1,6 +1,6 @@ package org.embulk.input.cloudwatch_logs.aws; -import org.embulk.spi.unit.LocalFile; +import org.embulk.util.config.units.LocalFile; import java.util.Optional; diff --git a/src/main/java/org/embulk/input/cloudwatch_logs/aws/AwsCredentialsTask.java b/src/main/java/org/embulk/input/cloudwatch_logs/aws/AwsCredentialsTask.java index 50c072a..422bb6d 100644 --- a/src/main/java/org/embulk/input/cloudwatch_logs/aws/AwsCredentialsTask.java +++ b/src/main/java/org/embulk/input/cloudwatch_logs/aws/AwsCredentialsTask.java @@ -1,8 +1,8 @@ package org.embulk.input.cloudwatch_logs.aws; -import org.embulk.config.Config; -import org.embulk.config.ConfigDefault; -import org.embulk.spi.unit.LocalFile; +import org.embulk.util.config.Config; +import org.embulk.util.config.ConfigDefault; +import org.embulk.util.config.units.LocalFile; import java.util.Optional; diff --git a/src/test/java/org/embulk/input/cloudwatch_logs/TestAwsCredentials.java b/src/test/java/org/embulk/input/cloudwatch_logs/TestAwsCredentials.java index 8f68b59..67595d8 100644 --- a/src/test/java/org/embulk/input/cloudwatch_logs/TestAwsCredentials.java +++ b/src/test/java/org/embulk/input/cloudwatch_logs/TestAwsCredentials.java @@ -2,17 +2,17 @@ import com.amazonaws.services.logs.AWSLogs; import com.amazonaws.services.logs.model.InputLogEvent; - -import org.embulk.EmbulkTestRuntime; import org.embulk.config.ConfigSource; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; +import org.embulk.input.cloudwatch_logs.TestHelpers.CloudWatchLogsTestUtils; import org.embulk.spi.InputPlugin; import org.embulk.spi.PageBuilder; import org.embulk.spi.Schema; -import org.embulk.spi.TestPageBuilderReader.MockPageOutput; +import org.embulk.test.EmbulkTestRuntime; +import org.embulk.test.TestPageBuilderReader.MockPageOutput; import org.embulk.test.TestingEmbulk; - +import org.embulk.util.config.ConfigMapper; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -20,15 +20,12 @@ import org.junit.Test; import org.mockito.Mockito; -import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.List; -import org.embulk.input.cloudwatch_logs.TestHelpers; -import org.embulk.input.cloudwatch_logs.TestHelpers.CloudWatchLogsTestUtils; - import static org.embulk.input.cloudwatch_logs.CloudwatchLogsInputPlugin.CloudWatchLogsPluginTask; +import static org.embulk.input.cloudwatch_logs.TestHelpers.CONFIG_MAPPER_FACTORY; import static org.junit.Assume.assumeNotNull; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.times; @@ -47,16 +44,15 @@ public class TestAwsCredentials private CloudwatchLogsInputPlugin plugin; private ConfigSource config; - private MockPageOutput output = new MockPageOutput(); + private final MockPageOutput output = new MockPageOutput(); private PageBuilder pageBuilder; private String logGroupName; private String logStreamName; - private AWSLogs logsClient; private CloudWatchLogsTestUtils testUtils; - private static String EMBULK_LOGS_TEST_REGION; - private static String EMBULK_LOGS_TEST_ACCESS_KEY_ID; - private static String EMBULK_LOGS_TEST_SECRET_ACCESS_KEY; + private static String embulkLogsTestRegion; + private static String embulkLogsTestAccessKeyId; + private static String embulkLogsTestSecretAccessKey; /* * This test case requires environment variables: @@ -68,14 +64,14 @@ public class TestAwsCredentials @BeforeClass public static void initializeConstantVariables() { - EMBULK_LOGS_TEST_REGION = System.getenv("EMBULK_LOGS_TEST_REGION"); - EMBULK_LOGS_TEST_ACCESS_KEY_ID = System.getenv("EMBULK_LOGS_TEST_ACCESS_KEY_ID"); - EMBULK_LOGS_TEST_SECRET_ACCESS_KEY = System.getenv("EMBULK_LOGS_TEST_SECRET_ACCESS_KEY"); - assumeNotNull(EMBULK_LOGS_TEST_REGION, EMBULK_LOGS_TEST_ACCESS_KEY_ID, EMBULK_LOGS_TEST_SECRET_ACCESS_KEY); + embulkLogsTestRegion = System.getenv("EMBULK_LOGS_TEST_REGION"); + embulkLogsTestAccessKeyId = System.getenv("EMBULK_LOGS_TEST_ACCESS_KEY_ID"); + embulkLogsTestSecretAccessKey = System.getenv("EMBULK_LOGS_TEST_SECRET_ACCESS_KEY"); + assumeNotNull(embulkLogsTestRegion, embulkLogsTestAccessKeyId, embulkLogsTestSecretAccessKey); } @Before - public void setUp() throws IOException + public void setUp() { logGroupName = TestHelpers.getLogGroupName(); logStreamName = TestHelpers.getLogStreamName(); @@ -86,25 +82,26 @@ public void setUp() throws IOException .set("type", "cloudwatch_logs") .set("log_group_name", logGroupName) .set("log_stream_name", logStreamName) - .set("region", EMBULK_LOGS_TEST_REGION); + .set("region", embulkLogsTestRegion); pageBuilder = Mockito.mock(PageBuilder.class); doReturn(pageBuilder).when(plugin).getPageBuilder(Mockito.any(), Mockito.any()); } } @After - public void tearDown() throws IOException + public void tearDown() { if (testUtils != null) { testUtils.clearLogGroup(); } } - private void doTest(ConfigSource config) throws IOException + private void doTest(ConfigSource config) { - CloudWatchLogsPluginTask task = config.loadConfig(CloudWatchLogsPluginTask.class); - CloudwatchLogsInputPlugin plugin = runtime.getInstance(CloudwatchLogsInputPlugin.class); - logsClient = plugin.newLogsClient(task); + ConfigMapper configMapper = CONFIG_MAPPER_FACTORY.createConfigMapper(); + CloudWatchLogsPluginTask task = configMapper.map(config, CloudWatchLogsPluginTask.class); + CloudwatchLogsInputPlugin plugin = new CloudwatchLogsInputPlugin(); + AWSLogs logsClient = plugin.newLogsClient(task); testUtils = new CloudWatchLogsTestUtils(logsClient, logGroupName, logStreamName); testUtils.createLogStream(); @@ -120,7 +117,9 @@ private void doTest(ConfigSource config) throws IOException testUtils.putLogEvents(events); try { Thread.sleep(10000); - } catch (InterruptedException e) { + } + catch (InterruptedException e) { + // NOP } plugin.transaction(config, new Control()); @@ -128,12 +127,12 @@ private void doTest(ConfigSource config) throws IOException } @Test - public void useBasic() throws IOException + public void useBasic() { ConfigSource config = this.config.deepCopy() .set("authentication_method", "basic") - .set("aws_access_key_id", EMBULK_LOGS_TEST_ACCESS_KEY_ID) - .set("aws_secret_access_key", EMBULK_LOGS_TEST_SECRET_ACCESS_KEY); + .set("aws_access_key_id", embulkLogsTestAccessKeyId) + .set("aws_secret_access_key", embulkLogsTestSecretAccessKey); doTest(config); } @@ -156,14 +155,14 @@ public void useProfile() } @Test - public void useProperties() throws IOException + public void useProperties() { String prevAccessKeyId = System.getProperty("aws.accessKeyId"); String prevSecretKey = System.getProperty("aws.secretKey"); try { ConfigSource config = this.config.deepCopy().set("authentication_method", "properties"); - System.setProperty("aws.accessKeyId", EMBULK_LOGS_TEST_ACCESS_KEY_ID); - System.setProperty("aws.secretKey", EMBULK_LOGS_TEST_SECRET_ACCESS_KEY); + System.setProperty("aws.accessKeyId", embulkLogsTestAccessKeyId); + System.setProperty("aws.secretKey", embulkLogsTestSecretAccessKey); doTest(config); } finally { diff --git a/src/test/java/org/embulk/input/cloudwatch_logs/TestCloudwatchLogsInputPlugin.java b/src/test/java/org/embulk/input/cloudwatch_logs/TestCloudwatchLogsInputPlugin.java index 2e3a4a1..2c4ca42 100644 --- a/src/test/java/org/embulk/input/cloudwatch_logs/TestCloudwatchLogsInputPlugin.java +++ b/src/test/java/org/embulk/input/cloudwatch_logs/TestCloudwatchLogsInputPlugin.java @@ -2,17 +2,17 @@ import com.amazonaws.services.logs.AWSLogs; import com.amazonaws.services.logs.model.InputLogEvent; - -import org.embulk.EmbulkTestRuntime; import org.embulk.config.ConfigSource; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; +import org.embulk.input.cloudwatch_logs.TestHelpers.CloudWatchLogsTestUtils; import org.embulk.spi.InputPlugin; import org.embulk.spi.PageBuilder; import org.embulk.spi.Schema; -import org.embulk.spi.TestPageBuilderReader.MockPageOutput; +import org.embulk.test.EmbulkTestRuntime; +import org.embulk.test.TestPageBuilderReader.MockPageOutput; import org.embulk.test.TestingEmbulk; - +import org.embulk.util.config.ConfigMapper; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -20,15 +20,12 @@ import org.junit.Test; import org.mockito.Mockito; -import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.List; -import org.embulk.input.cloudwatch_logs.TestHelpers; -import org.embulk.input.cloudwatch_logs.TestHelpers.CloudWatchLogsTestUtils; - import static org.embulk.input.cloudwatch_logs.CloudwatchLogsInputPlugin.CloudWatchLogsPluginTask; +import static org.embulk.input.cloudwatch_logs.TestHelpers.CONFIG_MAPPER_FACTORY; import static org.junit.Assume.assumeNotNull; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.times; @@ -47,16 +44,13 @@ public class TestCloudwatchLogsInputPlugin private CloudwatchLogsInputPlugin plugin; private ConfigSource config; - private MockPageOutput output = new MockPageOutput(); + private final MockPageOutput output = new MockPageOutput(); private PageBuilder pageBuilder; - private String logGroupName; - private String logStreamName; - private AWSLogs logsClient; private CloudWatchLogsTestUtils testUtils; - private static String EMBULK_LOGS_TEST_REGION; - private static String EMBULK_LOGS_TEST_ACCESS_KEY_ID; - private static String EMBULK_LOGS_TEST_SECRET_ACCESS_KEY; + private static String embulkLogsTestRegion; + private static String embulkLogsTestAccessKeyId; + private static String embulkLogsTestSecretAccessKey; /* * This test case requires environment variables: @@ -68,17 +62,17 @@ public class TestCloudwatchLogsInputPlugin @BeforeClass public static void initializeConstantVariables() { - EMBULK_LOGS_TEST_REGION = System.getenv("EMBULK_LOGS_TEST_REGION"); - EMBULK_LOGS_TEST_ACCESS_KEY_ID = System.getenv("EMBULK_LOGS_TEST_ACCESS_KEY_ID"); - EMBULK_LOGS_TEST_SECRET_ACCESS_KEY = System.getenv("EMBULK_LOGS_TEST_SECRET_ACCESS_KEY"); - assumeNotNull(EMBULK_LOGS_TEST_REGION, EMBULK_LOGS_TEST_ACCESS_KEY_ID, EMBULK_LOGS_TEST_SECRET_ACCESS_KEY); + embulkLogsTestRegion = System.getenv("EMBULK_LOGS_TEST_REGION"); + embulkLogsTestAccessKeyId = System.getenv("EMBULK_LOGS_TEST_ACCESS_KEY_ID"); + embulkLogsTestSecretAccessKey = System.getenv("EMBULK_LOGS_TEST_SECRET_ACCESS_KEY"); + assumeNotNull(embulkLogsTestRegion, embulkLogsTestAccessKeyId, embulkLogsTestSecretAccessKey); } @Before - public void setUp() throws IOException + public void setUp() { - logGroupName = TestHelpers.getLogGroupName(); - logStreamName = TestHelpers.getLogStreamName(); + String logGroupName = TestHelpers.getLogGroupName(); + String logStreamName = TestHelpers.getLogStreamName(); if (plugin == null) { plugin = Mockito.spy(new CloudwatchLogsInputPlugin()); @@ -86,26 +80,27 @@ public void setUp() throws IOException .set("type", "cloudwatch_logs") .set("log_group_name", logGroupName) .set("log_stream_name", logStreamName) - .set("region", EMBULK_LOGS_TEST_REGION) - .set("aws_access_key_id", EMBULK_LOGS_TEST_ACCESS_KEY_ID) - .set("aws_secret_access_key", EMBULK_LOGS_TEST_SECRET_ACCESS_KEY); + .set("region", embulkLogsTestRegion) + .set("aws_access_key_id", embulkLogsTestAccessKeyId) + .set("aws_secret_access_key", embulkLogsTestSecretAccessKey); pageBuilder = Mockito.mock(PageBuilder.class); } doReturn(pageBuilder).when(plugin).getPageBuilder(Mockito.any(), Mockito.any()); - CloudWatchLogsPluginTask task = config.loadConfig(CloudWatchLogsPluginTask.class); - CloudwatchLogsInputPlugin plugin = runtime.getInstance(CloudwatchLogsInputPlugin.class); - logsClient = plugin.newLogsClient(task); + ConfigMapper configMapper = CONFIG_MAPPER_FACTORY.createConfigMapper(); + CloudWatchLogsPluginTask task = configMapper.map(config, CloudWatchLogsPluginTask.class); + CloudwatchLogsInputPlugin plugin = new CloudwatchLogsInputPlugin(); + AWSLogs logsClient = plugin.newLogsClient(task); testUtils = new CloudWatchLogsTestUtils(logsClient, logGroupName, logStreamName); } @After - public void tearDown() throws IOException + public void tearDown() { testUtils.clearLogGroup(); } @Test - public void test_simple() throws IOException + public void test_simple() { testUtils.createLogStream(); @@ -120,7 +115,9 @@ public void test_simple() throws IOException testUtils.putLogEvents(events); try { Thread.sleep(10000); - } catch (InterruptedException e) { + } + catch (InterruptedException e) { + // NOP } plugin.transaction(config, new Control()); @@ -131,11 +128,11 @@ public void test_simple() throws IOException @Test public void configuredRegion() { - CloudWatchLogsPluginTask task = this.config.deepCopy() + ConfigMapper configMapper = CONFIG_MAPPER_FACTORY.createConfigMapper(); + CloudWatchLogsPluginTask task = configMapper.map(config.deepCopy() .set("region", "ap-southeast-2") - .remove("endpoint") - .loadConfig(CloudWatchLogsPluginTask.class); - CloudwatchLogsInputPlugin plugin = runtime.getInstance(CloudwatchLogsInputPlugin.class); + .remove("endpoint"), CloudWatchLogsPluginTask.class); + CloudwatchLogsInputPlugin plugin = new CloudwatchLogsInputPlugin(); AWSLogs logsClient = plugin.newLogsClient(task); // Should not be null diff --git a/src/test/java/org/embulk/input/cloudwatch_logs/TestHelpers.java b/src/test/java/org/embulk/input/cloudwatch_logs/TestHelpers.java index 237dc4e..aa5eb43 100644 --- a/src/test/java/org/embulk/input/cloudwatch_logs/TestHelpers.java +++ b/src/test/java/org/embulk/input/cloudwatch_logs/TestHelpers.java @@ -1,12 +1,5 @@ package org.embulk.input.cloudwatch_logs; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.util.Date; -import java.util.List; -import java.util.UUID; - import com.amazonaws.services.logs.AWSLogs; import com.amazonaws.services.logs.model.CreateLogGroupRequest; import com.amazonaws.services.logs.model.CreateLogStreamRequest; @@ -14,9 +7,16 @@ import com.amazonaws.services.logs.model.InputLogEvent; import com.amazonaws.services.logs.model.PutLogEventsRequest; import com.amazonaws.services.logs.model.ResourceNotFoundException; +import org.embulk.util.config.ConfigMapperFactory; + +import java.util.Date; +import java.util.List; +import java.util.UUID; public final class TestHelpers { + public static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder().addDefaultModules().build(); + private TestHelpers() {} public static String getLogGroupName() @@ -50,7 +50,8 @@ public void clearLogGroup() request.setLogGroupName(logGroupName); try { logs.deleteLogGroup(request); - } catch (ResourceNotFoundException ex) { + } + catch (ResourceNotFoundException ex) { // Just ignored. } } @@ -67,7 +68,8 @@ public void createLogStream() logs.createLogStream(streamRequest); } - public void putLogEvents(List events) { + public void putLogEvents(List events) + { PutLogEventsRequest request = new PutLogEventsRequest() .withLogGroupName(logGroupName) .withLogStreamName(logStreamName)