Enterprise Java

Custom Logs in Apache Spark

Have you ever felt the frustration of Spark job that runs for hours and it fails due to infra issue.

You know about this failure very late and waste couple of hours on it and it hurts more when Spark UI logs are also not available for postmortem.

You are not alone!

In this post i will go over how to enable your own custom logger that works well with Spark logger.

This custom logger will collect what ever information is required to go from reactive to proactive monitoring.

No need to setup extra logging infra for this.

Spark 2.X is based using Slf4j abstraction and it is using logback binding.

Lets start with logging basic, how to get logger instance in Spark jobs or application.

val _LOG = LoggerFactory.getLogger(this.getClass.getName)

It is that simple and now your application is using same log lib and settings that Spark is based on.

Now to do something more meaningful we have to inject our custom logger that will collect info and write it to Elastic search or Post to some REST endpoint or sends alerts.

lets go step by step to do this

Build custom log appender

Since spark 2.X is based on logback, so we have to write logback logger.

Code snippet for custom logback logger

class MetricsLogbackAppender extends UnsynchronizedAppenderBase[ILoggingEvent] {
  
  override def append(e: ILoggingEvent) = {
    //Send this message to elastic search or REST end point
    messageCount.compute(Thread.currentThread().getName, mergeValue)
    System.out.println(messageCount + " " + e)
  }

  val messageCount = new ConcurrentHashMap[String, AtomicInteger]()
  val mergeValue = new BiFunction[String, AtomicInteger, AtomicInteger] {
    def apply(key: String, currentValue: AtomicInteger) = {
      val nextValue = currentValue match {
        case null => new AtomicInteger(0)
        case _ => currentValue
      }
      nextValue.incrementAndGet()
      nextValue
    }
  }

}

This is very simple logger which is counting message per thread and all you have to do it override append function.

Such type of logger can do anything like writing to database or sending to REST endpoint or alerting .

Enable logger

For using new logger, create logback.xml file and add entry for new logger.

This file can be packed in Shaded jar or can be specified as runtime parameter.

Sample logback.xml

<configuration>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <!-- encoders are assigned the type
             ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="METRICS" class="micro.logback.MetricsLogbackAppender"/>

    <root level="info">
        <appender-ref ref="STDOUT" />
    </root>

    <logger level="info" name="micro" additivity="true">
        <appender-ref ref="METRICS" />
    </logger>

    <logger level="info" name="org.apache.spark.scheduler.DAGScheduler" additivity="true">
        <appender-ref ref="METRICS" />
    </logger>

</configuration>

This config file adding MetricsLogbackAppender as METRICS

<appender name="METRICS" class="micro.logback.MetricsLogbackAppender"/>

Next enabling it for package/classes that should use this

<logger level="info" name="micro" additivity="true">    <appender-ref ref="METRICS" /></logger>
<logger level="info" name="org.apache.spark.scheduler.DAGScheduler" additivity="true">    <appender-ref ref="METRICS" /></logger   

You are done!

Any message logged from ‘micro’ package or from DAGScheduler class will be using new logger .

Using this technique executor logs can be also capture and this becomes very useful when spark job is running on hundred or thousands of executor.

Now it opens up lots of option of having BI that shows all these message at real time, allow team to ask interesting questions or subscribe to alters when things are not going well.

Caution :Make sure that this new logger is slowing down application execution, making it asynchronous is recommended.

Get the insight at right time and turn it to action

Code used in this blog is available @ sparkmicroservices repo in github.

I am interested in knowing what logging patterns you are using for Spark.

Published on Java Code Geeks with permission by Ashkrit Sharma, partner at our JCG program. See the original article here: Custom Logs in Apache Spark

Opinions expressed by Java Code Geeks contributors are their own.

Ashkrit Sharma

Pragmatic software developer who loves practice that makes software development fun and likes to develop high performance & low latency system.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button