Versiones de Spark y bugs de Scala

Estas semanas he estado aprendiendo Scala y montando alguna cosilla para funcionar en Spark, y ha tocado portarla al cluster. La primera nuisance ha sido que al haber decidido, pobres de nosotros, trabajar con MapR hemos tenido que esperar a que hubiera una version compatible con Hive y Yarn, que ha resultado finalmente ser Spark 1.2.1. En cuanto a la version de Scala, no me atrevo a pasar de la 2.10.4, que es la que utilizan en la distro.

Se supone que podemos funcionar bien compilando todo nosotros y pasando como variable la configuracion, o bien lanzando con spark-submit --master yarn. Lo primero todavia no he conseguido que funcione, asi que me he agarrado a lo segundo, y os cuento aqui los problemas de compatibilidad que he tenido entre la version desarrollada en mi maquina y la compilacion para el submit en el cluster.

1) No hay ninguna garantia de que --jars adjunte el resto de librerias, asi que mejor empaquetarlas en el jar de la aplicacion misma. Esto es un poco latoso porque el sbt no sabe hacerlo o no explica cómo, asi que no queda mas remedio que adosar el resto de las clases que necesitemos via jar -u o algun mecanismo similar. Supongo que se podra construir como receta en el propio sbt pero no he visto algo estandarizado.

2) Sorpresita, hay un bug en la closure de scala cuando el codigo esta directamente en el tipico “object extends App” en vez de en un metodo. Asi que hay que inventarse o bien un def main, o bien simplemente un metodo anonimo poniendo otros corchetes:

... extends App {{ ....  }} 

3) De alguna manera, una serializacion que funcionaba en mi ordenador no funciona en el cluster, a pesar de que en teoria estan compilando con la misma version de Scala. Puede que el assembly de spark esté en realidad compilado con otra version, o puede que haya dependencias de la version de java, pero me inclino a pensar que la historia esta en el scala. El asunto es que  orElse produce una funcion compuesta que no es serializable.  No me ha quedado mas remedio que sustituir un elegante

f = g orElse f

(donde f es var y g es val) por un mamotreto donde digo explicitamente que el resultado es serializable

val f0 = f
f = new PartialFunction[Int, Float]
        with Serializable {
  override def apply(i: Int): Float
    = { if (g.isDefinedAt(i)) g(i) else f0(i) }
  override def isDefinedAt(i: Int): Boolean
    = { g.isDefinedAt(i) || f0.isDefinedAt(i) }
}

Es posible que los desarrolladores de Scala hayan dudado sobre la serializacion del orElse debido a la ambigüedad de la recursividad. En el primer caso se asume que la “f” es la funcion antigua, siguiendo la tradicion de interpretar que la parte derecha de una asignacion se evalua antes de modificar la izquierda:  a = a +1 y todo eso que tanto lia a los novatos en programación. En cambio, al menos en el Scala que esta usando mi cluster, la “definicion explita” de un orElse asume que la “f” es la misma que se esta asignando, y entra en un bucle recursivo, asi que hay que fijar la version anterior de la funcion con un val f0.

Y de momento no tengo mas apaños. Una vez serializadas las cosas, los RDD del Spark parece que funcionan perfectamente y el codigo ya puede ejecutarse con todos los recursos que el yarn ponga a nuestra disposicion. Se queda la pelota en las manos del administrador de sistema. Que de momento tambien soy yo 🙁

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.