Skip to content

Commit

Permalink
addressing review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Santhosh Gandhe <[email protected]>
  • Loading branch information
san81 committed Oct 24, 2024
1 parent 5a4e966 commit cf35364
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public long crawl(long lastPollTime,
continue;
}
itemInfoList.add(nextItem);
Map<String, String> metadata = nextItem.getMetadata();
long niCreated = Long.parseLong(metadata.get(CREATED)!=null? metadata.get(CREATED):"0");
long niUpdated = Long.parseLong(metadata.get(UPDATED)!=null? metadata.get(UPDATED):"0");
Map<String, Object> metadata = nextItem.getMetadata();
long niCreated = Long.parseLong(metadata.get(CREATED)!=null? (String)metadata.get(CREATED):"0");
long niUpdated = Long.parseLong(metadata.get(UPDATED)!=null? (String)metadata.get(UPDATED):"0");
updatedPollTime = Math.max(updatedPollTime, niCreated);
updatedPollTime = Math.max(updatedPollTime, niUpdated);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,19 @@ public interface SaasClient {
Iterator<ItemInfo> listItems();


/**
* Method to set the last time we polled the service to check for any changes.
*
* @param lastPollTime time in milliseconds
*/
void setLastPollTime(long lastPollTime);

/**
* Method for executing a particular partition or a chunk of work
*
* @param state worker node state holds the details of this particular chunk of work
* @param buffer pipeline buffer to write the results into
* @param sourceConfig pipeline configuration from the yaml
*/
void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, SaasSourceConfig sourceConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import lombok.Getter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.event.Event;
Expand Down Expand Up @@ -52,7 +51,6 @@ public abstract class SaasSourcePlugin implements Source<Record<Event>>, UsesEnh
private final Crawler crawler;


@DataPrepperPluginConstructor
public SaasSourcePlugin(final PluginMetrics pluginMetrics,
final SaasSourceConfig sourceConfig,
final PluginFactory pluginFactory,
Expand Down Expand Up @@ -95,11 +93,6 @@ public void stop() {
this.executorService.shutdownNow();
}

@Override
public boolean areAcknowledgementsEnabled() {
return Source.super.areAcknowledgementsEnabled();
}


@Override
public void setEnhancedSourceCoordinator(EnhancedSourceCoordinator sourceCoordinator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public abstract class ItemInfo {
* contents itself which can be used to apply regex filtering, change data capture etc. general
* assumption here is that fetching metadata should be faster than fetching entire Item
*/
Map<String, String> metadata;
Map<String, Object> metadata;

/**
* Process your change log events serially (preferably in a single thread) and ensure that you are
Expand All @@ -37,7 +37,7 @@ public ItemInfo(String itemId) {
this.itemId = itemId;
}

public ItemInfo(@NonNull String itemId, Map<String, String> metadata, @NonNull Long eventTime) {
public ItemInfo(@NonNull String itemId, Map<String, Object> metadata, @NonNull Long eventTime) {
this.itemId = itemId;
this.metadata = metadata;
this.eventTime = eventTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private int getMaxItemsPerPage() throws NoSuchFieldException, IllegalAccessExcep
}

private static class TestItemInfo extends ItemInfo {
public TestItemInfo(String itemId, Map<String, String> metadata, Long eventTime) {
public TestItemInfo(String itemId, Map<String, Object> metadata, Long eventTime) {
super(itemId, metadata, eventTime);

}
Expand All @@ -178,7 +178,7 @@ public Map<String, String> getKeyAttributes() {
}

private ItemInfo createTestItemInfo(String id, String created, String updated) {
Map<String, String> metadata = new HashMap<>();
Map<String, Object> metadata = new HashMap<>();
if (created != null) metadata.put(Crawler.CREATED, created);
if (updated != null) metadata.put(Crawler.UPDATED, updated);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class ItemInfoTest {

static class TestItemInfo extends ItemInfo {

public TestItemInfo(@NonNull String itemId, Map<String, String> metadata, @NonNull Long eventTime) {
public TestItemInfo(@NonNull String itemId, Map<String, Object> metadata, @NonNull Long eventTime) {
super(itemId, metadata, eventTime);
}

Expand Down

0 comments on commit cf35364

Please sign in to comment.