big data

 

cmd="sort -T /mnt/volume/work/tmp2 --batch-size=30 -m -n -u -t: -k4 ";
 for input in sortday/*; do 
   cmd="$cmd <(gunzip -c '$input')"; 
 done; 
time eval $cmd
 | python correct.py
 | tee >(gzip -9 > sortedFast.gz)
 | tee >( 
     grep \{\"type\":\"Point\",\"coordinates\": 
    | tee >( gzip > GeoPosPointClean.gz)
    | grep -v objectType.:.activity...verb.:.share.
    |gzip > GeoPosPointCleanNoshare.gz 
    ) 
 |tee >( jq -c 'select(.geo).geo.coordinates' > coordenadasBis.txt ) 
 | pv -rb 
 | time awk -F, '{print | "gzip > sortedFast.day"substr($5,23,2)".gz"}'

Lo que estamos viendo aqui arriba podria ser una tarea tipica de preparacion de datos: tenemos unos ficheros preordenados pero que podrian tener duplicados y errores, y queremos dejarlos limpios. Para colmo, todo esta comprimido.

Asi que lo primero que hay que hacer es crear un sort que admita un numero arbitrario de descompresiones. La solucion cmd=”$cmd <(gunzip -c ‘$input’)”  estaba en stackoverflow y nadie habia propuesto una mejor. En cuanto a los parametros del sort, algunos de ellos sobran; a no ser que pongamos un batchsize mas pequeño no se usará el tmp2, y lo demas es la especificación de la clave; lo importante es el -m que le anuncia que sus inputs estan ordenados, y el -u que elimina los duplicados.

En el final de la pipe, awk accede no a la clave de ordenamiento sino a algun campo que indique como separar los ficheros, y que en este caso existia y no nos ha obligado a decorar las pipes de entrada. Con ello volvemos a comprimir el output.

Entremedio, algun script, en este caso python correct.py, se encarga de eliminar o corregir los errores que pudiera haber en el formato de las lineas.

Para aprovechar el dia, se pueden utilizar algunos tee para realizar operaciones secundarias aprovechando el momento en que los datos estan descomprimidos; a fin de cuentas la descompresion va a ser el principal cuello de botella. Y lo del dia lo digo literalmente, es facil que tengamos ficheros que se tarda una o dos horas en descomprimir.

Es posible eliminar cuellos de botella de la compresion a base de pigz, de algun otro compresor paralelizado, o simplemente comprimiendo a cachos y luego juntando con un cat.  Si esto deja el cuello de botella en el script de parseo y correccion, conviene paralelizarlo con el comando parallel –pipe, el cual afortunadamente tiene una opcion, -k, que nos mantiene el ordenamiento producido por el sort.

cmd="sort -T /mnt/volume/work/tmp7 --batch-size=30 -m -n -u -t: -k4 ";
   for input in sortday/*; do 
     cmd="$cmd <(pigz -dc '$input')";
  done; 

time eval $cmd 
  | parallel -j 6 -k --pipe python correct.py 
  | pv -rb 
  | time awk -F, '{print | "pigz > sortedPfast.day"substr($5,23,2)".gz"}'

Para las pruebecillas iniciales de rendimiento conviene tener instalado pv, que medirá el ancho de banda que circula por la pipe.

 

Leave a Reply / Añade un comentario: