Skip to content

Commit

Permalink
code clean up and show duplicate lidvids
Browse files Browse the repository at this point in the history
  • Loading branch information
Al Niessner authored and Al Niessner committed Jan 10, 2025
1 parent 0989f54 commit 6264548
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 157 deletions.
2 changes: 2 additions & 0 deletions src/main/java/gov/nasa/pds/harvest/cmd/HarvestCmd.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import gov.nasa.pds.harvest.crawler.FilesProcessor;
import gov.nasa.pds.harvest.crawler.ProductProcessor;
import gov.nasa.pds.harvest.crawler.RefsCache;
import gov.nasa.pds.harvest.dao.RegistryDocBatch;
import gov.nasa.pds.harvest.dao.RegistryManager;
import gov.nasa.pds.harvest.meta.XPathCacheLoader;
import gov.nasa.pds.harvest.util.CounterMap;
Expand Down Expand Up @@ -277,6 +278,7 @@ private void printSummary()
{
Counter counter = RegistryManager.getInstance().getCounter();

RegistryDocBatch.showDuplicates();
log.log(LogUtils.LEVEL_SUMMARY, "Summary:");
int processedCount = counter.prodCounters.getTotal();

Expand Down
27 changes: 10 additions & 17 deletions src/main/java/gov/nasa/pds/harvest/crawler/FilesProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,17 @@ public boolean test(Path path, BasicFileAttributes attrs)
* @param dir Directory with PDS4 labels
* @throws Exception Generic exception
*/
public void processDirectory(File dir) throws Exception
{
Stream<Path> stream = null;

try
{
stream = Files.find(dir.toPath(), Integer.MAX_VALUE, new FileMatcher(), FileVisitOption.FOLLOW_LINKS);
Iterator<Path> it = stream.iterator();

while(it.hasNext())
{
onFile(it.next().toFile());
}
}
finally
{
CloseUtils.close(stream);
public void processDirectory(File dir) throws Exception {
try (Stream<Path> stream = Files.find(
dir.toPath(),
Integer.MAX_VALUE,
new FileMatcher(), FileVisitOption.FOLLOW_LINKS)) {
Iterator<Path> it = stream.iterator();

while(it.hasNext()) {
onFile(it.next().toFile());
}
}
}


Expand Down
255 changes: 116 additions & 139 deletions src/main/java/gov/nasa/pds/harvest/dao/MetadataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,154 +17,131 @@
import gov.nasa.pds.registry.common.meta.Metadata;


public class MetadataWriter implements Closeable
{
private final static String WARN_SKIP_PRE = "Skipping registered product ";
private final static String WARN_SKIP_POST = " (LIDVID/LID already exists in registry database)";
private final static int ES_DOC_BATCH_SIZE = 50;
private final ConnectionFactory conFact;
private Logger log;

private RegistryDao registryDao;
private DataLoader loader;
private RegistryDocBatch docBatch;
private String jobId;

private int totalRecords;
private boolean overwriteExisting = false;

private Counter counter;

/**
* Constructor
* @param cfg registry configuration
* @throws Exception an exception
*/
public MetadataWriter(ConnectionFactory conFact, RegistryDao dao, Counter counter) throws Exception
{
this.conFact = conFact;
log = LogManager.getLogger(this.getClass());
loader = new DataLoader(conFact);
docBatch = new RegistryDocBatch();
jobId = PackageIdGenerator.getInstance().getPackageId();

this.registryDao = dao;
this.counter = counter;
}
public class MetadataWriter implements Closeable {
private final static String WARN_SKIP_PRE = "Skipping registered product ";
private final static String WARN_SKIP_POST = " (LIDVID/LID already exists in registry database)";
private final static int ES_DOC_BATCH_SIZE = 50;
private final ConnectionFactory conFact;
private Logger log;


public void setOverwriteExisting(boolean b)
{
this.overwriteExisting = b;
}


public void write(Metadata meta) throws Exception
{
docBatch.write(this.conFact, meta, jobId);

if(docBatch.size() % ES_DOC_BATCH_SIZE == 0)
{
writeBatch();
}
private RegistryDao registryDao;
private DataLoader loader;
private RegistryDocBatch docBatch;
private String jobId;

private int totalRecords;
private boolean overwriteExisting = false;

private Counter counter;

/**
* Constructor
*
* @param cfg registry configuration
* @throws Exception an exception
*/
public MetadataWriter(ConnectionFactory conFact, RegistryDao dao, Counter counter)
throws Exception {
this.conFact = conFact;
log = LogManager.getLogger(this.getClass());
loader = new DataLoader(conFact);
docBatch = new RegistryDocBatch();
jobId = PackageIdGenerator.getInstance().getPackageId();
this.registryDao = dao;
this.counter = counter;
}


public void setOverwriteExisting(boolean b) {
this.overwriteExisting = b;
}


public void write(Metadata meta) throws Exception {
docBatch.write(this.conFact, meta, jobId);

if (docBatch.size() % ES_DOC_BATCH_SIZE == 0) {
writeBatch();
}
}


private void writeBatch() throws Exception
{
if(docBatch.isEmpty()) return;

Set<String> nonRegisteredIds = null;
if(!overwriteExisting)
{
List<String> batchLidVids = docBatch.getLidVids();
nonRegisteredIds = registryDao.getNonExistingIds(batchLidVids);
if(nonRegisteredIds == null || nonRegisteredIds.isEmpty())
{
for(String lidvid: batchLidVids)
{
log.warn(WARN_SKIP_PRE + lidvid + WARN_SKIP_POST);
counter.skippedFileCount++;
}

docBatch.clear();
return;
}
}

// Build JSON documents for Elasticsearch
List<String> data = new ArrayList<>();

for(RegistryDocBatch.NJsonItem item: docBatch.getItems())
{
if(nonRegisteredIds == null)
{
addItem(data, item);
}
else
{
if(nonRegisteredIds.contains(item.lidvid))
{
addItem(data, item);
}
else
{
log.warn(WARN_SKIP_PRE + item.lidvid + WARN_SKIP_POST);
counter.skippedFileCount++;
}
}
}

// Load batch
Set<String> failedIds = new TreeSet<>();
totalRecords += loader.loadBatch(data, failedIds);
log.info("Wrote " + totalRecords + " product(s)");

// Update failed counter
counter.failedFileCount += failedIds.size();

// Update product counters
for(RegistryDocBatch.NJsonItem item: docBatch.getItems())
{
if((nonRegisteredIds == null && !failedIds.contains(item.lidvid)) ||
(nonRegisteredIds != null && nonRegisteredIds.contains(item.lidvid) && !failedIds.contains(item.lidvid)))
{
counter.prodCounters.inc(item.prodClass);
}
private void writeBatch() throws Exception {
if (docBatch.isEmpty())
return;

Set<String> nonRegisteredIds = null;
if (!overwriteExisting) {
List<String> batchLidVids = docBatch.getLidVids();
nonRegisteredIds = registryDao.getNonExistingIds(batchLidVids);
if (nonRegisteredIds == null || nonRegisteredIds.isEmpty()) {
for (String lidvid : batchLidVids) {
log.warn(WARN_SKIP_PRE + lidvid + WARN_SKIP_POST);
counter.skippedFileCount++;
}

// Clear batch
docBatch.clear();
return;
}
}


private void addItem(List<String> data, RegistryDocBatch.NJsonItem item)
{
data.add(item.pkJson);
data.add(item.dataJson);
// Build JSON documents for Elasticsearch
List<String> data = new ArrayList<>();

for (RegistryDocBatch.NJsonItem item : docBatch.getItems()) {
if (nonRegisteredIds == null) {
addItem(data, item);
} else {
if (nonRegisteredIds.contains(item.lidvid)) {
addItem(data, item);
} else {
log.warn(WARN_SKIP_PRE + item.lidvid + WARN_SKIP_POST);
counter.skippedFileCount++;
}
}
}


public void flush() throws Exception
{
writeBatch();

// Load batch
Set<String> failedIds = new TreeSet<>();
totalRecords += loader.loadBatch(data, failedIds);
log.info("Wrote " + totalRecords + " product(s)");

// Update failed counter
counter.failedFileCount += failedIds.size();

// Update product counters
for (RegistryDocBatch.NJsonItem item : docBatch.getItems()) {
if ((nonRegisteredIds == null && !failedIds.contains(item.lidvid))
|| (nonRegisteredIds != null && nonRegisteredIds.contains(item.lidvid)
&& !failedIds.contains(item.lidvid))) {
counter.prodCounters.inc(item.prodClass);
}
}


@Override
public void close() throws IOException
{
try
{
flush();
}
catch(IOException ex)
{
throw ex;
}
catch(Exception ex)
{
throw new IOException(ex);
}

// Clear batch
docBatch.clear();
}


private void addItem(List<String> data, RegistryDocBatch.NJsonItem item) {
RegistryDocBatch.increment(item.lidvid);
data.add(item.pkJson);
data.add(item.dataJson);
}


public void flush() throws Exception {
writeBatch();
}


@Override
public void close() throws IOException {
try {
flush();
} catch (IOException ex) {
throw ex;
} catch (Exception ex) {
throw new IOException(ex);
}
}
}
28 changes: 27 additions & 1 deletion src/main/java/gov/nasa/pds/harvest/dao/RegistryDocBatch.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package gov.nasa.pds.harvest.dao;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import gov.nasa.pds.registry.common.ConnectionFactory;
Expand All @@ -26,7 +28,8 @@ public static class NJsonItem

}
final private static HashSet<String> alreadyLearned = new HashSet<String>();
final private Logger log = LogManager.getLogger(RegistryDocBatch.class);
final private static HashMap<String,Integer> history = new HashMap<String,Integer>();
final private static Logger log = LogManager.getLogger(RegistryDocBatch.class);
private List<NJsonItem> items;


Expand Down Expand Up @@ -109,4 +112,27 @@ public List<String> getLidVids()
items.forEach((item) -> { ids.add(item.lidvid); } );
return ids;
}
static public void increment(String lidvid) {
Integer count = history.containsKey(lidvid) ? history.get(lidvid) : 0;
history.put(lidvid, ++count);
}
static public void showDuplicates() {
boolean first = true;
for (Entry<String,Integer> entry : history.entrySet()) {
if (entry.getValue() > 1) {
if (first) {
log.fatal("The harvested collection has duplicate lidvids. Double check content of these lidvids:");
first = false;
}
log.fatal(" Found " + entry.getValue() + " of lidvid " + entry.getKey());
}
}
if (!first) {
int total = 0;
for (Integer count : history.values()) {
total += count;
}
log.fatal(" Total number of duplicate lidvids: " + (total - history.size()));
}
}
}

0 comments on commit 6264548

Please sign in to comment.