Skip to content

Commit

Permalink
Init
Browse files Browse the repository at this point in the history
eberhardtj committed Aug 22, 2018

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
0 parents commit 97e07d8
Showing 23 changed files with 1,558 additions and 0 deletions.
141 changes: 141 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
= metafacture-solr-plugin
:toc:

A plugin for link:https://github.com/metafacture/metafacture-core[metafacture] that extends the standard XML module.

== Build

```
gradlew test fatJar
```

Produces `metafacture-solr-VERSION-plugin.jar` in `build/libs` .

Place the build JAR inside the `plugins` directory of your `metafacture-core` distribution.

== Command Reference

|===
|Command | In | Out

|build-solr-doc
|StreamReceiver
|SolrDocumentReceiver

|handle-solr-xml
|XmlReceiver
|SolrDocumentReceiver

|to-solr
|SolrDocumentReceiver
|Void

|===

=== build-solr-doc

==== Description

Builds a Solr Input Document from metafacture stream events.

The following metafacture events

----
startRecord("ignored")
literal("id", "1")
literal("key", "value")
endRecord()
----

would create the following Solr Document

----
{"id": "1", "key": "value" }
----

Atomic index updates are handled by the `entity` event.

----
startRecord("ignored")
literal("id", "1")
entity("add")
literal("name", "alice")
literal("name", "bob")
endEntity()
endRecord()
----

creates the following Solr Document

----
{"id": "1", "name": {"add": ["alice", "bob"]}}
----

See also link:https://lucene.apache.org/solr/guide/7_1/updating-parts-of-documents.html[Updating Parts of Documents].

==== Syntax

```
build-solr-doc
```

==== Example

Flux:

```
... | build-solr-doc | to-solr(...);
```

=== handle-solr-xml

==== Description

A XML handler for Solr Index Updates.

==== Syntax

```
handle-solr-xml
```

==== Example

Flux:

```
> | decode-xml | handle-solr-xml | ...
```

=== to-solr

==== Description

A sink that commits Solr Input Documents to a Apache Solr instance.

==== Syntax

```
to-solr(url, [core], [batchSize], [commitWithinMs], [threads])
```

==== Parameters

* `url`: URL to Solr Server.
* `core`: Solr Core (Default: default)
* `batchSize`: Number of documents per commit (Default: 1).
* `commitWithinMs`: Max time (in ms) before a commit will happen (Default: 500).
* `threads`: Number of threads for concurrent batch processing (Default: 1).

==== Example

Minimal case:

```
... | to-solr("https://example.com/solr/", core="test");
```

```
... | to-solr(url="https://example.com/solr/", core="test",
batchSize="2", commitWithinMs="1000", threads="2");
```
31 changes: 31 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
group 'com.github.eberhardtj'
version '0.1.0'

apply plugin: 'java'

sourceCompatibility = 1.8
targetCompatibility = 1.8
compileJava.options.encoding = 'UTF-8'

repositories {
mavenCentral()
}

dependencies {
implementation 'org.metafacture:metafacture-framework:5.0.0'
compile 'org.codehaus.jcsp:jcsp:1.1-rc5'
compile 'org.apache.solr:solr-solrj:7.4.0'
testImplementation 'junit:junit:4.12'
testImplementation 'org.metafacture:metafacture-xml:5.0.0'
testImplementation 'org.metafacture:metafacture-io:5.0.0'
testImplementation 'org.metafacture:metafacture-strings:5.0.0'
testImplementation 'org.mockito:mockito-core:2.5.5'
}

task fatJar(type: Jar) {
from {
configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}
archiveName = archivesBaseName.replace('-plugin', '') + '-' + version + '-plugin' + '.jar'
with jar
}
Binary file added gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
5 changes: 5 additions & 0 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.6-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
172 changes: 172 additions & 0 deletions gradlew
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
#!/usr/bin/env sh

##############################################################################
##
## Gradle start up script for UN*X
##
##############################################################################

# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
# Need this for relative symlinks.
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "$PRG"`"/$link"
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >/dev/null

APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`

# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS=""

# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"

warn () {
echo "$*"
}

die () {
echo
echo "$*"
echo
exit 1
}

# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
nonstop=false
case "`uname`" in
CYGWIN* )
cygwin=true
;;
Darwin* )
darwin=true
;;
MINGW* )
msys=true
;;
NONSTOP* )
nonstop=true
;;
esac

CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar

# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
if [ ! -x "$JAVACMD" ] ; then
die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
else
JAVACMD="java"
which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi

# Increase the maximum file descriptors if we can.
if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
MAX_FD_LIMIT=`ulimit -H -n`
if [ $? -eq 0 ] ; then
if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
MAX_FD="$MAX_FD_LIMIT"
fi
ulimit -n $MAX_FD
if [ $? -ne 0 ] ; then
warn "Could not set maximum file descriptor limit: $MAX_FD"
fi
else
warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
fi
fi

# For Darwin, add options to specify how the application appears in the dock
if $darwin; then
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi

# For Cygwin, switch paths to Windows format before running java
if $cygwin ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`

# We build the pattern for arguments to be converted via cygpath
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
SEP=""
for dir in $ROOTDIRSRAW ; do
ROOTDIRS="$ROOTDIRS$SEP$dir"
SEP="|"
done
OURCYGPATTERN="(^($ROOTDIRS))"
# Add a user-defined pattern to the cygpath arguments
if [ "$GRADLE_CYGPATTERN" != "" ] ; then
OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
fi
# Now convert the arguments - kludge to limit ourselves to /bin/sh
i=0
for arg in "$@" ; do
CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option

if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition
eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
else
eval `echo args$i`="\"$arg\""
fi
i=$((i+1))
done
case $i in
(0) set -- ;;
(1) set -- "$args0" ;;
(2) set -- "$args0" "$args1" ;;
(3) set -- "$args0" "$args1" "$args2" ;;
(4) set -- "$args0" "$args1" "$args2" "$args3" ;;
(5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
(6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
(7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
(8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
(9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
esac
fi

# Escape application args
save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
APP_ARGS=$(save "$@")

# Collect all arguments for the java command, following the shell quoting and substitution rules
eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"

# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then
cd "$(dirname "$0")"
fi

exec "$JAVACMD" "$@"
84 changes: 84 additions & 0 deletions gradlew.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@rem
@rem ##########################################################################

@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal

set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%

@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS=

@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome

set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto init

echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.

goto fail

:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe

if exist "%JAVA_EXE%" goto init

echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.

goto fail

:init
@rem Get command-line arguments, handling Windows variants

if not "%OS%" == "Windows_NT" goto win9xME_args

:win9xME_args
@rem Slurp the command line arguments.
set CMD_LINE_ARGS=
set _SKIP=2

:win9xME_args_slurp
if "x%~1" == "x" goto execute

set CMD_LINE_ARGS=%*

:execute
@rem Setup the command line

set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar

@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%

:end
@rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd

:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
exit /b 1

:mainEnd
if "%OS%"=="Windows_NT" endlocal

:omega
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
rootProject.name = 'metafacture-solr-plugin'
20 changes: 20 additions & 0 deletions src/main/java/org/metafacture/framework/SolrDocumentPipe.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2018 Deutsche Nationalbibliothek
*
* Licensed under the Apache License, Version 2.0 the "License";
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.metafacture.framework;

public interface SolrDocumentPipe<R extends Receiver> extends SolrDocumentReceiver, Sender<R> {
// Just a combination of sender and receiver
}
23 changes: 23 additions & 0 deletions src/main/java/org/metafacture/framework/SolrDocumentReceiver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2018 Deutsche Nationalbibliothek
*
* Licensed under the Apache License, Version 2.0 the "License";
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.metafacture.framework;

import org.apache.solr.common.SolrInputDocument;
import org.metafacture.framework.ObjectReceiver;

public interface SolrDocumentReceiver extends ObjectReceiver<SolrInputDocument> {
// Just a combination of LifeCycle and the corresponding ObjectReceiver
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2018 Deutsche Nationalbibliothek
*
* Licensed under the Apache License, Version 2.0 the "License";
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.metafacture.framework.helpers;

import org.apache.solr.common.SolrInputDocument;
import org.metafacture.framework.Receiver;
import org.metafacture.framework.SolrDocumentPipe;

public class DefaultSolrDocumentPipe <R extends Receiver> extends DefaultSender<R> implements SolrDocumentPipe<R> {

public void process(final SolrInputDocument obj) {
// Default implementation does nothing
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2018 Deutsche Nationalbibliothek
*
* Licensed under the Apache License, Version 2.0 the "License";
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.metafacture.framework.helpers;

import org.apache.solr.common.SolrInputDocument;
import org.metafacture.framework.ObjectReceiver;

public class DefaultSolrDocumentReceiver extends DefaultLifeCycle implements ObjectReceiver<SolrInputDocument> {

@Override
public void process(final SolrInputDocument obj) {
// Default implementation does nothing
}
}

46 changes: 46 additions & 0 deletions src/main/java/org/metafacture/solr/HashMapList.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.metafacture.solr;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

class HashMapList<K,V> {
private Map<K, List<V>> hashList;

public HashMapList() {
hashList = new HashMap<>();
}

public void add(K key, V value) {
if (!hashList.containsKey(key)) {
hashList.put(key, new ArrayList<>());
}

List<V> list = hashList.get(key);
list.add(value);
}

public List<V> get(K key) {
return hashList.get(key);
}

public Map<K,List<V>> asMap() {
return hashList;
}

public boolean containsKey(K key) {
return hashList.containsKey(key);
}

public boolean isEmpty() {
return hashList.isEmpty();
}

@Override
public String toString() {
return "HashMapList{" +
"hashList=" + hashList +
'}';
}
}
92 changes: 92 additions & 0 deletions src/main/java/org/metafacture/solr/SolrCommitProcess.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2018 Deutsche Nationalbibliothek
*
* Licensed under the Apache License, Version 2.0 the "License";
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.metafacture.solr;

import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
import org.jcsp.lang.CSProcess;
import org.jcsp.lang.ChannelInput;
import org.jcsp.lang.PoisonException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class SolrCommitProcess implements CSProcess {
private ChannelInput<SolrInputDocument> in;
private SolrClient client;
private String collection;
private int batchSize;
private int commitWithinMs;

public SolrCommitProcess(ChannelInput<SolrInputDocument> in,
SolrClient client,
String collection)
{
this.in = in;
this.client = client;
this.collection = collection;
this.batchSize = 1;
this.commitWithinMs = 500;
}

public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}

public void setCommitWithinMs(int commitWithinMs) {
this.commitWithinMs = commitWithinMs;
}

@Override
public void run() {
List<SolrInputDocument> batch = new ArrayList<>(batchSize);

while (true) {
SolrInputDocument document;
try {
document = receive();
batch.add(document);
} catch (PoisonException e) {
if (!batch.isEmpty()) commit(batch);
break;
}

if (batch.size() == batchSize) {
commit(batch);
batch = new ArrayList<>(batchSize);
}
}
}

private boolean commit(List<SolrInputDocument> documents) {
try {
UpdateResponse response = client.add(collection, documents, commitWithinMs);
if (response.getStatus() != 0) {
return false;
}
} catch (IOException|SolrServerException e) {
return false;
}
return true;
}

private SolrInputDocument receive() {
return in.read();
}
}
93 changes: 93 additions & 0 deletions src/main/java/org/metafacture/solr/SolrDocumentBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2018 Deutsche Nationalbibliothek
*
* Licensed under the Apache License, Version 2.0 the "License";
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.metafacture.solr;

import org.apache.solr.common.SolrInputDocument;
import org.metafacture.framework.FluxCommand;
import org.metafacture.framework.ObjectReceiver;
import org.metafacture.framework.SolrDocumentReceiver;
import org.metafacture.framework.StreamReceiver;
import org.metafacture.framework.annotations.Description;
import org.metafacture.framework.annotations.In;
import org.metafacture.framework.annotations.Out;
import org.metafacture.framework.helpers.DefaultStreamPipe;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Description("Builds a Solr Document from metafacture stream events.")
@In(StreamReceiver.class)
@Out(SolrDocumentReceiver.class)
@FluxCommand("build-solr-doc")
public class SolrDocumentBuilder extends DefaultStreamPipe<ObjectReceiver<SolrInputDocument>> {

SolrInputDocument document;
String updateMethod;
String updateFieldName;
List<String> updateFieldValues;

public SolrDocumentBuilder() {
updateMethod = "";
updateFieldValues = new ArrayList<>();
}

@Override
public void startRecord(String identifier) {
document = new SolrInputDocument();
}

@Override
public void endRecord() {
getReceiver().process(document);
}

@Override
public void startEntity(String name) {
updateMethod = name;
}

@Override
public void endEntity() {
if (!updateMethod.isEmpty()) {
Map<String,Object> atomicUpdateAction = new HashMap<>();
atomicUpdateAction.put(updateMethod, new ArrayList<>(updateFieldValues));
document.addField(updateFieldName, atomicUpdateAction);
}
updateMethod = "";
}

@Override
public void literal(String name, String value) {
if (updateMethod.isEmpty()) {
document.addField(name, value);
} else {
updateFieldName = name;
updateFieldValues.add(value);
}
}

@Override
public void onResetStream() {
updateFieldValues.clear();
}

@Override
public void onCloseStream() {
updateFieldValues.clear();
}
}
136 changes: 136 additions & 0 deletions src/main/java/org/metafacture/solr/SolrWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright 2018 Deutsche Nationalbibliothek
*
* Licensed under the Apache License, Version 2.0 the "License";
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.metafacture.solr;

import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.common.SolrInputDocument;
import org.jcsp.lang.Channel;
import org.jcsp.lang.ChannelOutput;
import org.jcsp.lang.One2AnyChannel;
import org.jcsp.lang.Parallel;
import org.jcsp.util.Buffer;
import org.metafacture.framework.FluxCommand;
import org.metafacture.framework.SolrDocumentReceiver;
import org.metafacture.framework.annotations.Description;
import org.metafacture.framework.annotations.In;
import org.metafacture.framework.annotations.Out;
import org.metafacture.framework.helpers.DefaultSolrDocumentReceiver;

@Description("Adds documents to a Solr core.")
@In(SolrDocumentReceiver.class)
@Out(Void.class)
@FluxCommand("to-solr")
public class SolrWriter extends DefaultSolrDocumentReceiver {

/** Solr Server URL */
private String url;
private String core;

private SolrClient client;
/** Number of document per commit */
private int batchSize;
/** Time range in which a commit will happen. */
private int commitWithMs;

/** Number of threads to run in parallel */
int threads;
One2AnyChannel<SolrInputDocument> documentChannel;
ChannelOutput<SolrInputDocument> documentChannelOutput;

private Thread workerThread;

/** Flag for a hook that acts before the first processing occurs. */
private boolean onStartup;

public SolrWriter(String url) {
this.url = url;
this.core = "default";
this.threads = 1;
this.batchSize = 1;
this.commitWithMs = 500;
this.onStartup = true;
}

public void setCore(String core) {
this.core = core;
}

public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}

public void setCommitWithMs(int commitWithMs) {
this.commitWithMs = commitWithMs;
}

public void setThreads(int threads) {
this.threads = threads;
}

@Override
public void process(SolrInputDocument document) {
if (onStartup) {
HttpSolrClient httpClient = new HttpSolrClient.Builder()
.withBaseSolrUrl(url)
.allowCompression(true)
.build();
httpClient.setRequestWriter(new BinaryRequestWriter());
client = httpClient;

documentChannel = Channel.one2any(new Buffer<>(2 * threads), threads);

Parallel parallel = new Parallel();
for (int i = 0; i < threads; i++) {
SolrCommitProcess process = new SolrCommitProcess(documentChannel.in(), client, core);
process.setBatchSize(batchSize);
process.setCommitWithinMs(commitWithMs);
parallel.addProcess(process);
}

documentChannelOutput = documentChannel.out();

onStartup = false;

workerThread = new Thread(new Runnable() {
@Override
public void run() {
parallel.run();
}
});
workerThread.start();
}

documentChannelOutput.write(document);
}

@Override
public void resetStream() {
onStartup = true;
documentChannelOutput.poison(threads);
}

@Override
public void closeStream() {
documentChannelOutput.poison(threads);
try {
workerThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
144 changes: 144 additions & 0 deletions src/main/java/org/metafacture/solr/SolrXmlHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright 2018 Deutsche Nationalbibliothek
*
* Licensed under the Apache License, Version 2.0 the "License";
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.metafacture.solr;

import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.metafacture.framework.*;
import org.metafacture.framework.annotations.Description;
import org.metafacture.framework.annotations.In;
import org.metafacture.framework.annotations.Out;
import org.metafacture.framework.helpers.DefaultXmlPipe;
import org.xml.sax.Attributes;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Converts XML Solr Documents for Index Updates to a Metafacture Stream.
*
* Encodes atomic index updates. See also: https://wiki.apache.org/solr/UpdateXmlMessages#Optional_attributes_for_.22add.22
*/
@Description("A handler for XML formatted index updates for Apache Solr.")
@In(XmlReceiver.class)
@Out(SolrDocumentReceiver.class)
@FluxCommand("handle-solr-xml")
public class SolrXmlHandler extends DefaultXmlPipe<ObjectReceiver<SolrInputDocument>>
{
private static final String DOC = "doc";
private static final String FIELD = "field";
private static final String NO_MODIFICATION = "";

private SolrInputDocument solrDocument;

/** Document depth (documents in a document). */
private int documentDepth = 0;

/** Stores the characters of the current XML element. */
private StringBuilder characters = new StringBuilder();

/** Local name of the current XML element */
private String currentElement = "";

/** Value of the name attribute for a field element */
private String fieldName;
/** A flag that indicates a atomic update */

private boolean isModified;
/**
* Field modifier. May be 'inc', 'add', 'set', 'remove' or 'removeregexp'.
* @see <a href="https://lucene.apache.org/solr/guide/7_0/updating-parts-of-documents.html#atomic-updates">Atomic Updates</a>
*/
private String fieldModifier;

private Map<String, HashMapList<String,String>> fieldUpdates;

public SolrXmlHandler() {
this.isModified = false;
}

public void startElement(final String uri, final String localName,
final String qName, final Attributes attributes) {
currentElement = localName;
switch (localName) {
case DOC:
solrDocument = new SolrInputDocument();
fieldUpdates = new HashMap<>();
documentDepth++;
if (documentDepth == 2) {
throw new MetafactureException("Nested documents are not supported!");
}
break;
case FIELD:
fieldName = attributes.getValue("name");
isModified = attributes.getValue("update") != null;
if (isModified) fieldModifier = attributes.getValue("update");
break;
default:
break;
}
}

@Override
public void endElement(final String uri, final String localName, final String qName) {
currentElement = localName;

if (currentElement.equals(DOC)) {
if (documentDepth == 1) {
if (!fieldUpdates.isEmpty()) {
for (Map.Entry<String, HashMapList<String,String>> entry : fieldUpdates.entrySet()) {
String fieldName = entry.getKey();
Map<String,List<String>> atomicUpdates = entry.getValue().asMap();
solrDocument.addField(fieldName, atomicUpdates);
}
}
getReceiver().process(solrDocument);
reset();
} else {
throw new MetafactureException("Nested documents are not supported!");
}
documentDepth--;
} else if (currentElement.equals(FIELD)) {
String fieldValue = characters.toString().trim();
if (!isModified) {
solrDocument.addField(fieldName, fieldValue);
} else {
if (!fieldUpdates.containsKey(fieldName)) {
HashMapList<String,String> updates = new HashMapList<>();
updates.add(fieldModifier, fieldValue);
fieldUpdates.put(fieldName, updates);
} else {
HashMapList<String,String> updates = fieldUpdates.get(fieldName);
updates.add(fieldModifier, fieldValue);
}
}
}
characters.setLength(0);
}

@Override
public void characters(final char[] chars, final int start, final int length) {
characters.append(chars, start, length);
}

private void reset() {
solrDocument = new SolrInputDocument();
fieldUpdates.clear();
fieldModifier = NO_MODIFICATION;
}
}
18 changes: 18 additions & 0 deletions src/main/resources/flux-commands.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Copyright 2018 Deutsche Nationalbibliothek
#
# Licensed under the Apache License, Version 2.0 the "License";
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
handle-solr-xml org.metafacture.solr.SolrXmlHandler
build-solr-doc org.metafacture.solr.SolrDocumentBuilder
to-solr org.metafacture.solr.SolrWriter
51 changes: 51 additions & 0 deletions src/test/java/org/metafacture/solr/FakeSolrClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.metafacture.solr;

import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.NamedList;

import java.io.IOException;
import java.util.*;

/**
* A simple class that mocks a SolrClient. Stores added document in memory.
*/
public class FakeSolrClient extends SolrClient {

Map<String,List<SolrInputDocument>> storage;

public FakeSolrClient() {
this.storage = new HashMap<>();
}

public List<SolrInputDocument> getCollection(String collection) {
return storage.getOrDefault(collection, new ArrayList<>());
}

@Override
public UpdateResponse add(String collection, Collection<SolrInputDocument> docs, int commitWithinMs) throws SolrServerException, IOException {
if (!storage.containsKey(collection)) storage.put(collection, new ArrayList<>());
List<SolrInputDocument> list = storage.get(collection);
list.addAll(docs);

UpdateResponse updateResponse = new UpdateResponse();
updateResponse.setElapsedTime(0);
NamedList<Object> response = new NamedList<>();
response.add("status", 0);
updateResponse.setResponse(response);
return updateResponse;
}

@Override
public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
throw new UnsupportedOperationException();
}

@Override
public void close() throws IOException {
throw new UnsupportedOperationException();
}
}
49 changes: 49 additions & 0 deletions src/test/java/org/metafacture/solr/HashMapListTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.metafacture.solr;

import org.junit.Before;
import org.junit.Test;

import java.util.List;
import java.util.Map;

import static org.junit.Assert.*;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.assertThat;

public class HashMapListTest {

private HashMapList<String,String> hashMapList;

@Before
public void setUp() throws Exception {
hashMapList = new HashMapList<>();
}

@Test
public void shouldReturnList() {
hashMapList.add("key", "value");
assertThat(hashMapList.get("key"), is(instanceOf(List.class)));
}

@Test
public void shouldContainValue() {
hashMapList.add("key", "value1");
hashMapList.add("key", "value2");
List<String> values = hashMapList.get("key");
assertThat(values, hasItems("value1", "value2"));
}

@Test
public void containsKey() {
hashMapList.add("key", "value");
assertThat(hashMapList.containsKey("key"), equalTo(true));
}

@Test
public void asMap() {
hashMapList.add("key", "value");
Map<String,List<String>> map = hashMapList.asMap();
assertThat(map.containsKey("key"), equalTo(true));
assertThat(map.get("key"), hasItem("value"));
}
}
48 changes: 48 additions & 0 deletions src/test/java/org/metafacture/solr/ObjectBuffer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2018 Deutsche Nationalbibliothek
*
* Licensed under the Apache License, Version 2.0 the "License";
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.metafacture.solr;

import org.metafacture.framework.ObjectReceiver;

/**
* Stores the last received object. A buffer of size one.
*/
public class ObjectBuffer<T> implements ObjectReceiver<T> {

private T obj;

public ObjectBuffer() {
}

@Override
public void resetStream() {
this.obj = null;
}

@Override
public void closeStream() {

}

@Override
public void process(T obj) {
this.obj = obj;
}

T getObject() {
return obj;
}
}
97 changes: 97 additions & 0 deletions src/test/java/org/metafacture/solr/SolrCommitProcessTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2018 Deutsche Nationalbibliothek
*
* Licensed under the Apache License, Version 2.0 the "License";
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.metafacture.solr;

import org.apache.solr.common.SolrInputDocument;
import org.jcsp.lang.*;
import org.jcsp.util.Buffer;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.assertThat;

public class SolrCommitProcessTest {

private FakeSolrClient client;
private One2AnyChannel<SolrInputDocument> documentChannel;

@Before
public void setUp() throws Exception {
client = new FakeSolrClient();
documentChannel = Channel.one2any(new Buffer<>(1), 1);
}

@Test
public void shouldTerminate() {
CSProcess send = new SendProcess(documentChannel.out(), new ArrayList<>());
CSProcess commit = new SolrCommitProcess(documentChannel.in(), client, "test");
Parallel parallel = new Parallel();
parallel.addProcess(send);
parallel.addProcess(commit);
parallel.run();
}

@Test
public void shouldReceiveDocuments() {
SolrInputDocument doc1 = new SolrInputDocument();
doc1.addField("id", "1");

SolrInputDocument doc2 = new SolrInputDocument();
doc2.addField("id", "2");

SolrInputDocument doc3 = new SolrInputDocument();
doc3.addField("id", "3");

List<SolrInputDocument> docs = Stream.of(doc1, doc2, doc3).collect(Collectors.toList());

CSProcess send = new SendProcess(documentChannel.out(), docs);
SolrCommitProcess commit = new SolrCommitProcess(documentChannel.in(), client, "test");
commit.setBatchSize(2);

Parallel parallel = new Parallel();
parallel.addProcess(send);
parallel.addProcess(commit);
parallel.run();

List<SolrInputDocument> collection = client.getCollection("test");
assertThat(collection.size(), is(equalTo(3)));
assertThat(collection, hasItems(doc1, doc2, doc3));
}

/** A simple producer process that puts elements into a channel. */
class SendProcess implements CSProcess {

private ChannelOutput<SolrInputDocument> chan;
private List<SolrInputDocument> documents;

public SendProcess(ChannelOutput<SolrInputDocument> chan, List<SolrInputDocument> documents) {
this.chan = chan;
this.documents = documents;
}

@Override
public void run() {
documents.forEach(chan::write);
chan.poison(1);
}
}
}
107 changes: 107 additions & 0 deletions src/test/java/org/metafacture/solr/SolrDocumentBuilderTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2018 Deutsche Nationalbibliothek
*
* Licensed under the Apache License, Version 2.0 the "License";
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.metafacture.solr;

import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.junit.Before;
import org.junit.Test;

import java.util.List;
import java.util.Map;

import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.assertThat;

public class SolrDocumentBuilderTest {

private SolrDocumentBuilder builder;
private ObjectBuffer<SolrInputDocument> buffer;

@Before
public void setUp()
{

buffer = new ObjectBuffer<SolrInputDocument>();
builder = new SolrDocumentBuilder();
builder.setReceiver(buffer);
}

@Test
public void shouldIgnoreRecordId() {
builder.startRecord("ignored");
builder.literal("name", "alice");
builder.endRecord();
builder.closeStream();

SolrInputDocument document = buffer.getObject();
assertThat(document.getFieldNames().size(), is(equalTo(1)));
assertThat(document.getFieldNames(), hasItems("name"));
}

@Test
public void shouldContainSingleValueField() {
builder.startRecord("id1");
builder.literal("name", "alice");
builder.endRecord();
builder.closeStream();

SolrInputDocument document = buffer.getObject();
SolrInputField field = document.getField("name");
assertThat(field.getValueCount(), is(equalTo(1)));
assertThat(field.getFirstValue(), is(equalTo("alice")));
}

@Test
public void shouldContainMultiValueField() {
builder.startRecord("id1");
builder.literal("name", "alice");
builder.literal("name", "bob");
builder.endRecord();
builder.closeStream();

SolrInputDocument document = buffer.getObject();
SolrInputField field = document.getField("name");
assertThat(field.getValueCount(), is(equalTo(2)));
assertThat(field.getValues(), hasItems("alice", "bob"));
}

@Test
@SuppressWarnings("unchecked")
public void shouldContainAtomicUpdates() {
builder.startRecord("id1");
builder.startEntity("add");
builder.literal("name", "alice");
builder.literal("name", "bob");
builder.endEntity();
builder.endRecord();
builder.closeStream();

SolrInputDocument document = buffer.getObject();

assertThat(document.getFieldNames(), hasItems("name"));

SolrInputField field = document.getField("name");
assertThat(field.getValueCount(), is(equalTo(1)));

Object value = field.getFirstValue();
assertThat(value, is(instanceOf(Map.class)));

Map<String,List<String>> valueMap = (Map<String,List<String>>)value;
assertThat(valueMap.keySet(), hasItems("add"));
assertThat(valueMap.get("add"), hasItems("alice", "bob"));
}
}
145 changes: 145 additions & 0 deletions src/test/java/org/metafacture/solr/SolrXmlHandlerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package org.metafacture.solr;

import org.apache.solr.common.SolrInputDocument;
import org.junit.Before;
import org.junit.Test;
import org.xml.sax.Attributes;
import org.xml.sax.helpers.AttributesImpl;

import javax.xml.namespace.QName;

import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.function.Predicate.isEqual;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.junit.Assert.*;

public class SolrXmlHandlerTest {

private SolrXmlHandler handler;
private ObjectBuffer<SolrInputDocument> buffer;

@Before
public void setUp() throws Exception {
buffer = new ObjectBuffer<>();
handler = new SolrXmlHandler();
handler.setReceiver(buffer);
}

@Test
public void singleDocument() {
startRoot(handler);
startDocument(handler);
addField(handler, "name", "alice");
addField(handler, "alias", "bob");
endDocument(handler);
endRoot(handler);

buffer.closeStream();
SolrInputDocument document = buffer.getObject();

assertThat(document.getFieldNames(), hasItems("name", "alias"));
assertThat(document.getField("name").getValue(), equalTo("alice"));
assertThat(document.getField("alias").getValue(), equalTo("bob"));
}

@Test
public void multiValueDocument() {
startRoot(handler);
startDocument(handler);
addField(handler, "name", "alice");
addField(handler, "name", "bob");
endDocument(handler);
endRoot(handler);

buffer.closeStream();
SolrInputDocument document = buffer.getObject();

assertThat(document.getFieldNames(), hasItems("name"));
assertThat(document.getField("name").getValueCount(), equalTo(2));
assertThat(document.getField("name").getValues(), hasItems("alice", "bob"));
}

@Test
public void singleUpdateDocument() {
startRoot(handler);
startDocument(handler);
addField(handler, "id", "1");
addField(handler, "name", "alice", "add");
endDocument(handler);
endRoot(handler);

buffer.closeStream();
SolrInputDocument document = buffer.getObject();

assertThat(document.getFieldNames(), hasItems("id", "name"));
assertThat(document.getFieldValue("id"), equalTo("1"));

HashMap<String, List<String>> atomicUpdates = new HashMap<>();
atomicUpdates.put("add", Stream.of("alice").collect(Collectors.toList()));
assertThat(document.getFieldValue("name"), equalTo(atomicUpdates));
}

@Test
public void multiUpdateDocument() {
startRoot(handler);
startDocument(handler);
addField(handler, "id", "1");
addField(handler, "name", "alice", "add");
addField(handler, "name", "bob", "add");
addField(handler, "age", "20", "set");
endDocument(handler);
endRoot(handler);

buffer.closeStream();
SolrInputDocument document = buffer.getObject();

assertThat(document.getFieldNames(), hasItems("id", "name"));
assertThat(document.getFieldValue("id"), equalTo("1"));

HashMap<String, List<String>> atomicUpdatesNames = new HashMap<>();
atomicUpdatesNames.put("add", Stream.of("alice", "bob").collect(Collectors.toList()));
assertThat(document.getFieldValue("name"), equalTo(atomicUpdatesNames));

HashMap<String, List<String>> atomicUpdatesAge = new HashMap<>();
atomicUpdatesAge.put("set", Stream.of("20").collect(Collectors.toList()));
assertThat(document.getFieldValue("age"), equalTo(atomicUpdatesAge));
}

private void startRoot(SolrXmlHandler handler) {
handler.startElement("", "add", "add", new AttributesImpl());
}

private void startDocument(SolrXmlHandler handler) {
handler.startElement("", "doc", "doc", new AttributesImpl());
}

private void addField(SolrXmlHandler handler, String name, String value) {
AttributesImpl atts = new AttributesImpl();
atts.addAttribute("", "name", "name", "CDATA", name);
handler.startElement("", "field", "field", atts);
handler.characters(value.toCharArray(), 0, value.length());
handler.endElement("", "field", "field");
}

private void addField(SolrXmlHandler handler, String name, String value, String updateMethod) {
AttributesImpl atts = new AttributesImpl();
atts.addAttribute("", "name", "name", "CDATA", name);
atts.addAttribute("", "update", "update", "CDATA", updateMethod);
handler.startElement("", "field", "field", atts);
handler.characters(value.toCharArray(), 0, value.length());
handler.endElement("", "field", "field");
}

private void endDocument(SolrXmlHandler handler) {
handler.endElement("", "doc", "doc");
}

private void endRoot(SolrXmlHandler handler) {
handler.endElement("", "add", "add");
}
}

0 comments on commit 97e07d8

Please sign in to comment.