Skip to content

Commit

Permalink
Merge pull request #22 from lyft/spark-bug
Browse files Browse the repository at this point in the history
SPARK-33790 Reduce rpc call to SingleFileEventLogFileReader
  • Loading branch information
s-pedamallu authored Jan 14, 2021
2 parents 9b8991f + 216d5a7 commit 354dbbf
Showing 1 changed file with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ object EventLogFileReader {
lastIndex: Option[Long]): EventLogFileReader = {
lastIndex match {
case Some(_) => new RollingEventLogFilesFileReader(fs, path)
case None => new SingleFileEventLogFileReader(fs, path, None)
case None => new SingleFileEventLogFileReader(fs, path)
}
}

Expand All @@ -116,7 +116,7 @@ object EventLogFileReader {

def apply(fs: FileSystem, status: FileStatus): Option[EventLogFileReader] = {
if (isSingleEventLog(status)) {
Some(new SingleFileEventLogFileReader(fs, status.getPath, Some(status)))
Some(new SingleFileEventLogFileReader(fs, status.getPath, Option(status)))
} else if (isRollingEventLogs(status)) {
Some(new RollingEventLogFilesFileReader(fs, status.getPath))
} else {
Expand Down Expand Up @@ -164,13 +164,11 @@ object EventLogFileReader {
* FileNotFoundException could occur if the log file is renamed before getting the
* status of log file.
*/
class SingleFileEventLogFileReader(
private[history] class SingleFileEventLogFileReader(
fs: FileSystem,
path: Path, fileStatus: Option[FileStatus]) extends EventLogFileReader(fs, path) {
private lazy val status = fileStatus match {
case Some(s) => s
case None => fileSystem.getFileStatus(rootPath)
}
path: Path,
maybeStatus: Option[FileStatus] = None) extends EventLogFileReader(fs, path) {
private lazy val status = maybeStatus.getOrElse(fileSystem.getFileStatus(rootPath))

override def lastIndex: Option[Long] = None

Expand Down Expand Up @@ -206,7 +204,7 @@ class SingleFileEventLogFileReader(
* This reader lists the files only once; if caller would like to play with updated list,
* it needs to create another reader instance.
*/
class RollingEventLogFilesFileReader(
private[history] class RollingEventLogFilesFileReader(
fs: FileSystem,
path: Path) extends EventLogFileReader(fs, path) {
import RollingEventLogFilesWriter._
Expand Down

0 comments on commit 354dbbf

Please sign in to comment.