上个月参加了一个Spark的培训📒,由于没有大数据基础,基本什么也没听懂。前两周买了《Hadoop权威指南》、《Hive编程指南》📚想认真学一下大数据相关的知识。结果第一个气象数据🌡的例子就卡了好久,今天研究了一下,记录📝一下。
准备数据
由于美国🇺🇸气象中心的数据下载比较慢,下载下来之后还要预处理,比较麻烦。索性自己写✍🏻一个脚本生成一个有格式的最终想要的文件📃,Python脚本如下:
#!/usr/bin/env python3
# _*_coding=utf8_*_
import random
import datetime
# 开始年份
start = 1900
# 结束年份
end = 1950
# 生成文件路径
file_path = "/Users/chenxii/Documents/BigData/climate-data/"
# 生成随机温度,-50摄氏度到50摄氏度,保留一位小数
def generate_temperature():
temp = (random.random() - 0.5) * 100
temp = int(temp * 10) / 10
if temp > 0:
return "+" + str(temp)
else:
return temp
# 生成温度文件,所有年份的温度放在一个文件中
def generate_file1(start_year, end_year, save_path):
file_name = 'temp.txt'
file = open(save_path + file_name, 'w')
for year in range(start_year, end_year):
date = datetime.date(int(year), 1, 1)
end_date = datetime.date(int(year), 12, 31)
while date <= end_date:
temp = generate_temperature()
file.write(str(date) + " " + str(temp) + '\n')
date += datetime.timedelta(days=1)
file.flush()
file.close()
if __name__ == '__main__':
# generate_file(start, end, file_path)
generate_file1(start, end, file_path)
保证路径存在,直接用 python 命令运行脚本即可,生成的数据格式为:
1900-01-20 +11.8
每天一条数据,第0-4位是年份,第11位为零上或零下的标识,第12位到结尾为气温值。0的时候没有 ‘+’ 或 ‘-‘ 标识。
用 hdfs dfs -put temp.txt /user/hadoop/climate/temp.txt
将文件上传到 hdfs 中。
Java程序编写
需要新建一个Maven项目,然后打成jar包上传到虚拟机上,代码如下:
package finqo.hadoop;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author [email protected]
* @since 2021/8/22 12:41 PM
*/
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(0, 4);
int temperature = 0;
if ('+' == line.charAt(11) || '-' == line.charAt(11)) {
double tmp = Double.parseDouble(line.substring(12)) * 10;
temperature = (int) tmp;
}
context.write(new Text(year), new IntWritable(temperature));
}
}
package finqo.hadoop;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author [email protected]
* @since 2021/8/22 1:09 PM
*/
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
package finqo.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import java.io.IOException;
/**
* @author [email protected]
* @since 2021/8/22 1:15 PM
*/
public class MaxTemperature extends Configured implements Tool {
public int run(String[] strings) throws Exception {
return 0;
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Configuration conf = new Configuration();
conf.set("mapred.jar", "flinqo-hadoop-1.0.jar");
Job job = Job.getInstance(conf);
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
mvn package
打成jar包后上传,pom文件需要添加:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>flinqo.hadoop</groupId>
<artifactId>flinqo-hadoop</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.4.4</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
不添加的话,jar运行的时候或报错,找不到主类。
运行程序
运行命令:
hadoop jar flinqo-hadoop-1.0.jar /user/hadoop/climate/temp.txt /user/hadoop/result
运行结束后查看结果:
hdfs dfs -cat /user/hadoop/result
结果:
1900 499
1901 495
1902 499
1903 498
1904 493
1905 499
1906 498
1907 494
1908 499
1909 497
1910 499
1911 497
1912 499
1913 493
1914 498
1915 497
1916 497
1917 499
1918 493
1919 498
1920 499
1921 499
1922 499
1923 497
1924 499
1925 498
1926 499
1927 499
1928 499
1929 498
1930 498
1931 499
1932 497
1933 495
1934 499
1935 498
1936 498
1937 498
1938 496
1939 497
1940 495
1941 499
1942 497
1943 499
1944 499
1945 499
1946 499
1947 499
1948 499
1949 497
因为放大了10倍,所以数值比较大。