TPSpark/ 0000755 0072212 0000144 00000000000 12676531437 013142 5 ustar julien.nauroy users TPSpark/pom.xml 0000644 0072212 0000144 00000004461 12676531342 014457 0 ustar julien.nauroy users 4.0.0fr.upsudTPSparkjar1.0TP Sparkscala-tools.orgScala-tools Maven2 Repositoryhttp://scala-tools.org/repo-releasesscala-tools.orgScala-tools Maven2 Repositoryhttp://scala-tools.org/repo-releasesorg.scala-toolsmaven-scala-plugin2.15.2compilecompilecompiletest-compiletestCompiletest-compileprocess-resourcescompilemaven-compiler-plugin3.31.7org.apache.sparkspark-core_2.101.5.0org.apache.sparkspark-sql_2.101.5.0org.scala-langscala-library2.10.4
TPSpark/src/ 0000755 0072212 0000144 00000000000 12556737453 013734 5 ustar julien.nauroy users TPSpark/src/main/ 0000755 0072212 0000144 00000000000 12556755654 014663 5 ustar julien.nauroy users TPSpark/src/main/scala/ 0000755 0072212 0000144 00000000000 12676465356 015746 5 ustar julien.nauroy users TPSpark/src/main/scala/Exercice6.scala 0000644 0072212 0000144 00000002335 12675601053 020554 0 ustar julien.nauroy users import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
// Exercice 6 : Completez le code a trous ci-dessous de maniere a calculer la temperature moyenne par annee
object Exercice6 {
def main(args: Array[String]) {
// Suppression d'une partie des messages d'information tres verbeux
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
// En dehors de spark-shell, il faut declarer manuellement le contexte
val conf = new SparkConf().setAppName("demo")
val sc = new SparkContext(conf)
// Verification du chemin en entree
if(args.length != 1) {
println("Usage: Exercice6 ")
System.exit(1)
}
// Chargement des fichiers du NCDC
val lines = sc.textFile(args(0))
// Interpretation de chaque ligne pour creer une entree NCDCData
val records = lines.map(s => new NCDCData(s))
// Filtrage des donnees invalides
val filtered = records.filter(data => (data.airTemperature != 9999 && data.airTemperatureQuality.matches("[01459]")))
// A partir d'ici, trouvez votre propre methode !
// TODO
// Affichage des resultats
// TODO
}
}
TPSpark/src/main/scala/Exercice2.scala 0000644 0072212 0000144 00000002502 12676437735 020563 0 ustar julien.nauroy users import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
// Exercice 2 : Completez le code a trous ci-dessous de maniere a calculer la temperature maximale par annee.
// Inspirez-vous du code fourni en Exercice 1
object Exercice2 {
def main(args: Array[String]) {
// Suppression d'une partie des messages d'information tres verbeux
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
// En dehors de spark-shell, il faut declarer manuellement le contexte
val conf = new SparkConf().setAppName("demo")
val sc = new SparkContext(conf)
// Verification du chemin en entree
if(args.length != 1) {
println("Usage: Exercice2 ")
System.exit(1)
}
// Chargement des fichiers du NCDC
val lines = sc.textFile(args(0))
// Interpretation de chaque ligne pour creer une entree NCDCData
val records = lines.map(s => new NCDCData(s))
// Filtrage des donnees invalides
val filtered = records.filter(data => (data.airTemperature != 9999 && data.airTemperatureQuality.matches("[01459]")))
// Creation des couples (annee, temperature)
// TODO
// Calcul de la temperature maximale par cle
// TODO
// Affichage des resultats
// TODO
}
}
TPSpark/src/main/scala/Exercice4_2.scala 0000644 0072212 0000144 00000003251 12676461326 021001 0 ustar julien.nauroy users import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
// Exercice 4.2 : Completez le code a trous ci-dessous de maniere a calculer les temperatures maximale et minimale en une passe par annee.
// Dans cet exercice vous utiliserez la methode aggregateByKey
object Exercice4_2 {
def main(args: Array[String]) {
// Suppression d'une partie des messages d'information tres verbeux
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
// En dehors de spark-shell, il faut declarer manuellement le contexte
val conf = new SparkConf().setAppName("demo")
val sc = new SparkContext(conf)
// Verification du chemin en entree
if(args.length != 1) {
println("Usage: Exercice4_2 ")
System.exit(1)
}
// Chargement des fichiers du NCDC
val lines = sc.textFile(args(0))
// Interpretation de chaque ligne pour creer une entree NCDCData
val records = lines.map(s => new NCDCData(s))
// Filtrage des donnees invalides
val filtered = records.filter(data => (data.airTemperature != 9999 && data.airTemperatureQuality.matches("[01459]")))
// Creation des couples (annee, temperature)
// TODO
// Calcul des temperatures minimale et maximale par cle
// La methode aggregateByKey s'ecrit de la facon suivante :
// aggregateByKey(U zerovalue)(seqOp : (U,V) => U, combOp : (U, U) => U)
// Aide : U est la valeur de depart (Int.MaxValue, Int.MinValue), seqOp compare un tuple (min, max) avec un entier, combOp compare deux tuples (min, max)
// TODO
// Affichage des resultats
// TODO
}
}
TPSpark/src/main/scala/Exercice3.scala 0000644 0072212 0000144 00000002675 12675577705 020600 0 ustar julien.nauroy users import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
// Exercice 3 : Completez le code a trous ci-dessous de maniere a calculer la temperature maximale par couple (annee, ID de station).
// Inspirez-vous du code fourni en Exercice 1 et des indices fournis dans le code
object Exercice3 {
def main(args: Array[String]) {
// Suppression d'une partie des messages d'information tres verbeux
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
// En dehors de spark-shell, il faut declarer manuellement le contexte
val conf = new SparkConf().setAppName("demo")
val sc = new SparkContext(conf)
// Verification du chemin en entree
if(args.length != 1) {
println("Usage: Exercice3 ")
System.exit(1)
}
// Chargement des fichiers du NCDC
val lines = sc.textFile(args(0))
// Interpretation de chaque ligne pour creer une entree NCDCData
val records = lines.map(s => new NCDCData(s))
// Filtrage des donnees invalides
val filtered = records.filter(data => (data.airTemperature != 9999 && data.airTemperatureQuality.matches("[01459]")))
// Creation des couples ((annee, ID), temperature)
// Les "tuples" scala s'ecrivent tel quel : (v1, v2)
// TODO
// Calcul de la temperature maximale par cle
// TODO
// Affichage des resultats
// TODO
}
}
TPSpark/src/main/scala/NCDCData.scala.old 0000644 0072212 0000144 00000001347 12675424217 021024 0 ustar julien.nauroy users case class NCDCData(
var USAFID: String,
var year: Integer ,
var month: Integer,
var day: Integer,
var hour: Integer,
var minute: Integer,
var stationName: String,
var airTemperature: Integer,
var airTemperatureQuality: String
) {
def this(s: String) = this(
s.substring(4, 10),
Integer.parseInt(s.substring(15, 19)),
Integer.parseInt(s.substring(19, 21)),
Integer.parseInt(s.substring(21, 23)),
Integer.parseInt(s.substring(23, 25)),
Integer.parseInt(s.substring(25, 27)),
s.substring(51, 56),
Integer.parseInt(s.substring(87, 92)),
s.substring(92, 93)
)
}
object NCDCData {
def apply(s: String) = new NCDCData(s)
}
TPSpark/src/main/scala/Exercice7.scala 0000644 0072212 0000144 00000003552 12675601205 020556 0 ustar julien.nauroy users import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
// Exercice 7 :
// 1. Analysez le code ci-dessous qui met en place SparkSQL pour effectuer des requetes SQL sur vos donnees
// 2. Refaites les exercices precedents a la suite de la requete de demonstration en utilisant des requetes SQL
// on definit une nouvelle classe plus appropriee a son interpretation par spark SQL
case class NCDCData2(
USAFID: String,
year: Integer,
month: Integer,
day: Integer, hour: Integer,
minute: Integer,
stationName: String,
airTemperature: Integer,
airTemperatureQuality: String
)
object Exercice7 {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("Exercice7")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
// Recuperation des records
val records = sc.textFile("/ncdc/lite/*").map(s => NCDCData2(
s.substring(4, 10),
Integer.parseInt(s.substring(15, 19)),
Integer.parseInt(s.substring(19, 21)),
Integer.parseInt(s.substring(21, 23)),
Integer.parseInt(s.substring(23, 25)),
Integer.parseInt(s.substring(25, 27)),
s.substring(51, 56),
Integer.parseInt(s.substring(87, 92)),
s.substring(92, 93)
))
// Conversion en DataFrame
val recordsDF = records.toDF()
// Enregistrement du DataFrame sous forme de table SparkSQL
recordsDF.registerTempTable("records")
val query = sqlContext.sql("""SELECT USAFID, max(airTemperature) FROM records
WHERE airTemperature != 9999 AND airTemperatureQuality rlike "[01459]"
GROUP BY USAFID ORDER BY USAFID""")
query.collect().foreach(println)
}
}
TPSpark/src/main/scala/Exercice4_1.scala 0000644 0072212 0000144 00000003446 12675557554 021016 0 ustar julien.nauroy users import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
// Exercice 4.1 : Completez le code a trous ci-dessous de maniere a calculer les temperatures maximale et minimale en une passe par annee.
// Dans cet exercice vous creerez des tuples (temp, temp) et calculerez la valeur minimale de l'element de gauche et maximale de l'element de droite.
object Exercice4_1 {
def main(args: Array[String]) {
// Suppression d'une partie des messages d'information tres verbeux
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
// En dehors de spark-shell, il faut declarer manuellement le contexte
val conf = new SparkConf().setAppName("demo")
val sc = new SparkContext(conf)
// Verification du chemin en entree
if(args.length != 1) {
println("Usage: Exercice4_1 ")
System.exit(1)
}
// Chargement des fichiers du NCDC
val lines = sc.textFile(args(0))
// Interpretation de chaque ligne pour creer une entree NCDCData
val records = lines.map(s => new NCDCData(s))
// Filtrage des donnees invalides
val filtered = records.filter(data => (data.airTemperature != 9999 && data.airTemperatureQuality.matches("[01459]")))
// Creation des couples (annee, (temperature, temperature))
// TODO
// Calcul des temperatures minimale et maximale par cle
// Notez que les valeurs d'un tuple peuvent etre accedees de deux facons. Par exemple :
// 1. reduceByKey((t1, t2) => math.max(t1._1, t2._1))
// 2. reduceByKey{case ((x1, x2), (y1, y2)) => math.max(x1, y1)}
// Ces exemples ne constituent qu'une partie de la solution et ne sont pas directement fonctionels
// TODO
// Affichage des resultats
// TODO
}
}
TPSpark/src/main/scala/Exercice1.scala 0000644 0072212 0000144 00000003720 12676437711 020557 0 ustar julien.nauroy users import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
// Exercice 1 :
// 1. Executez ce programme a l'aide de la commande "./run.sh Exercice1 /ncdc/lite"
// 2. Verifiez le resultat sous forme de couples (USAFID, temperature). Note : les temperatures sont en dixiemes de degres
// 3. Analysez le code present dans la methode main()
// 4. Analysez le code present dans src/main/java/NCDCData.java
// 5. Decommentez le code supprimant les messages d'information et reexecutez le programme
object Exercice1 {
def main(args: Array[String]) {
// Suppression d'une partie des messages d'information tres verbeux
// Decommentez ce code des que possible
/*
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
*/
// En dehors de spark-shell, il faut declarer manuellement le contexte
val conf = new SparkConf().setAppName("demo")
val sc = new SparkContext(conf)
// Verification du chemin en entree
if(args.length != 1) {
println("Usage: Exercice1 ")
System.exit(1)
}
// Chargement des fichiers du NCDC
val lines = sc.textFile(args(0))
// Interpretation de chaque ligne pour creer une entree NCDCData
val records = lines.map(s => new NCDCData(s))
// Filtrage des donnees invalides
val filtered = records.filter(data => (data.airTemperature != 9999 && data.airTemperatureQuality.matches("[01459]")))
// Creation des couples (ID, temperature)
val tuples = filtered.map(data => (data.USAFID, data.airTemperature))
// Calcul de la temperature maximale par cle
val maxTemps = tuples.reduceByKey((t1, t2) => math.max(t1, t2))
// Affichage des resultats. Selon la taille du jeu de donnees, on utilisera collect (toutes les valeurs)
// ou take(X) (limitation a X valeurs)
//maxTemps.take(100).foreach(println)
maxTemps.sortByKey(true).collect().foreach(println)
}
}
TPSpark/src/main/scala/Exercice5.scala 0000644 0072212 0000144 00000004320 12676465356 020566 0 ustar julien.nauroy users import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
// Exercice 5 : Completez le code a trous ci-dessous de maniere a calculer la temperature maximale par nom de station.
object Exercice5 {
def main(args: Array[String]) {
// Suppression d'une partie des messages d'information tres verbeux
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
// En dehors de spark-shell, il faut declarer manuellement le contexte
val conf = new SparkConf().setAppName("demo")
val sc = new SparkContext(conf)
// Verification du chemin en entree
if(args.length != 1) {
println("Usage: Exercice5 ")
System.exit(1)
}
// Chargement des fichiers du NCDC
val lines = sc.textFile(args(0))
// Interpretation de chaque ligne pour creer une entree NCDCData
val records = lines.map(s => new NCDCData(s))
// Filtrage des donnees invalides
val filtered = records.filter(data => (data.airTemperature != 9999 && data.airTemperatureQuality.matches("[01459]")))
// Creation des couples (USAFID, temperature)
// TODO
// Calcul de la temperature maximale par cle
// TODO
// Chargement des donnees depuis le repertoire HDFS /ncdc/isd-history.csv
// TODO
// Les donnees sont de la forme "949999","00338","PORTLAND (CASHMORE)","AS","","","-38.320","+141.480","+0081.0","19690724","19781113"
// Les champs d'interet sont le premier (USAFID) et le 3e (nom de station)
// Recuperation des couples (USAFID, nom de station)
// Note : le decoupage se fait via split(',') ; on peut suprimer les guillemets avec une commande du type val.drop(1).dropRight(1)
// TODO
// Jointure entre les donnees de temperature et de noms de station
// La cle commune est l'USAFID
// Spark permet de joindre des RDD de facon tres simple a l'aide de la methode join qui cree a partir de tuples de type (K, U) et (K, V), des tuples de type (K, (U, V)).
// TODO
// Recuperation des tuples (nom de station, temperature maximale) a partir du resultat ci-dessus via une operation map()
// TODO
// Affichage des resultats
// TODO
}
}
TPSpark/src/main/java/ 0000755 0072212 0000144 00000000000 12675426045 015573 5 ustar julien.nauroy users TPSpark/src/main/java/NCDCData.java 0000755 0072212 0000144 00000002125 12675315413 017736 0 ustar julien.nauroy users import java.io.Serializable;
public class NCDCData implements Serializable {
public String USAFID;
// Note : le NCDCID n'est pas toujours defini (en particulier debut 1900)
public String NCDCID;
public int year;
public int month;
public int day;
public int hour;
public int minute;
public String stationName;
// Note : la temperature est exprimee en dixiemes de degres
public int airTemperature;
public String airTemperatureQuality;
public NCDCData(String s) {
this.USAFID = s.substring(4, 10);
this.NCDCID = s.substring(10, 15);
this.year = Integer.parseInt(s.substring(15, 19));
this.month = Integer.parseInt(s.substring(19, 21));
this.day = Integer.parseInt(s.substring(21, 23));
this.hour = Integer.parseInt(s.substring(23, 25));
this.minute = Integer.parseInt(s.substring(25, 27));
this.stationName = s.substring(51, 56);
this.airTemperature = Integer.parseInt(s.substring(87, 92));
this.airTemperatureQuality = s.substring(92, 93);
}
}
TPSpark/src/main/java/SparkJavaDemo.java 0000755 0072212 0000144 00000005034 12675247753 021141 0 ustar julien.nauroy users import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Map;
public class SparkJavaDemo {
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Usage: SparkJavaDemo ");
System.exit(-1);
}
SparkConf conf = new SparkConf().setAppName("SparkJavaDemo");
JavaSparkContext sc = new JavaSparkContext(conf);
// Chargement des fichiers du NCDC
JavaRDD lines = sc.textFile(args[0]);
// Interpretation de chaque ligne pour creer une entree NCDCData
JavaRDD records = lines.map(new Function() {
@Override public NCDCData call(String s) {
return new NCDCData(s);
}});
// Filtrage des donnees invalides
JavaRDD filtered = records.filter(new Function() {
@Override public Boolean call(NCDCData data) {
return data.airTemperature != 9999 && data.airTemperatureQuality.matches("[01459]");
}
});
// Creation des couples (ID, temperature)
JavaPairRDD tuples = filtered.mapToPair(
new PairFunction() {
@Override public Tuple2 call(NCDCData data) {
return new Tuple2(
data.USAFID, data.airTemperature);
}
}
);
// Calcul de la temperature maximale par cle
JavaPairRDD maxTemps = tuples.reduceByKey(
new Function2() {
@Override public Integer call(Integer i1, Integer i2) {
return Math.max(i1, i2);
}
}
);
// Affichage des resultats. Selon la taille du jeu de donnees, on utilisera collectAsMap (toutes les valeurs)
// ou take(X) (limitation a X valeurs)
Map results = maxTemps.collectAsMap();
for(String key: results.keySet()) {
System.out.format("(%s,%d)%n", key, results.get(key));
}
}
}
TPSpark/run.sh 0000755 0072212 0000144 00000002175 12676440036 014304 0 ustar julien.nauroy users #!/bin/bash
# Check if we have at least one paremeter
if [ $# -ne 2 ];
then echo "format : $0 className HDFSdirectory [args]"
exit 1
fi
# Compile automatically if the jar file is missing
# Or a source file has been modified since last compiling
if [ ! -f target/TPSpark-1.0.jar ]; then
echo ===========
echo TPSpark-1.0.jar does not exist. Compiling...
echo ===========
mvn package
if [ $? -ne 0 ]; then exit; fi
fi
jarmodified=`stat -c "%Y" target/TPSpark-1.0.jar`
scriptmodified=`find src/ -printf "%T@+\n" | sort -rn | head -n 1`
if [[ $jarmodified < $scriptmodified ]]; then
echo ===========
echo Source file changed. Recompiling...
echo ===========
mvn package
if [ $? -ne 0 ]; then exit; fi
fi
class=$1
dir=$2
shift 2
echo ===========
echo execution
echo ===========
sleep 1
# Run spark-submit
spark-submit --conf spark.executor.instances=2 --executor-cores 1 --executor-memory 1G --class $class target/TPSpark-1.0.jar $dir $*
#spark-submit --conf spark.dynamicAllocation.maxExecutors=2 --executor-cores 1 --executor-memory 2G --class fr.upsud.hadoop.$class target/TPSpark-1.0.jar $dir $*