Hadoop入门-找最高气温

上个月参加了一个Spark的培训📒,由于没有大数据基础,基本什么也没听懂。前两周买了《Hadoop权威指南》、《Hive编程指南》📚想认真学一下大数据相关的知识。结果第一个气象数据🌡的例子就卡了好久,今天研究了一下,记录📝一下。

准备数据

由于美国🇺🇸气象中心的数据下载比较慢,下载下来之后还要预处理,比较麻烦。索性自己写✍🏻一个脚本生成一个有格式的最终想要的文件📃,Python脚本如下:

#!/usr/bin/env python3
# _*_coding=utf8_*_

import random
import datetime

# 开始年份
start = 1900
# 结束年份
end = 1950
# 生成文件路径
file_path = "/Users/chenxii/Documents/BigData/climate-data/"


# 生成随机温度,-50摄氏度到50摄氏度,保留一位小数
def generate_temperature():
    temp = (random.random() - 0.5) * 100
    temp = int(temp * 10) / 10
    if temp > 0:
        return "+" + str(temp)
    else:
        return temp


# 生成温度文件,所有年份的温度放在一个文件中
def generate_file1(start_year, end_year, save_path):
    file_name = 'temp.txt'
    file = open(save_path + file_name, 'w')
    for year in range(start_year, end_year):
        date = datetime.date(int(year), 1, 1)
        end_date = datetime.date(int(year), 12, 31)
        while date <= end_date:
            temp = generate_temperature()
            file.write(str(date) + " " + str(temp) + '\n')
            date += datetime.timedelta(days=1)
    file.flush()
    file.close()


if __name__ == '__main__':
    # generate_file(start, end, file_path)
    generate_file1(start, end, file_path)
    

保证路径存在,直接用 python 命令运行脚本即可,生成的数据格式为:

1900-01-20 +11.8

每天一条数据,第0-4位是年份,第11位为零上或零下的标识,第12位到结尾为气温值。0的时候没有 ‘+’ 或 ‘-‘ 标识。

hdfs dfs -put temp.txt /user/hadoop/climate/temp.txt 将文件上传到 hdfs 中。

Java程序编写

需要新建一个Maven项目,然后打成jar包上传到虚拟机上,代码如下:

package finqo.hadoop;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author [email protected]
 * @since 2021/8/22 12:41 PM
 */
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String year = line.substring(0, 4);
        int temperature = 0;
        if ('+' == line.charAt(11) || '-' == line.charAt(11)) {
            double tmp = Double.parseDouble(line.substring(12)) * 10;
            temperature = (int) tmp;
        }
        context.write(new Text(year), new IntWritable(temperature));
    }
}
package finqo.hadoop;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author [email protected]
 * @since 2021/8/22 1:09 PM
 */
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int maxValue = Integer.MIN_VALUE;
        for (IntWritable value : values) {
            maxValue = Math.max(maxValue, value.get());
        }
        context.write(key, new IntWritable(maxValue));
    }
}
package finqo.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;

import java.io.IOException;

/**
 * @author [email protected]
 * @since 2021/8/22 1:15 PM
 */
public class MaxTemperature extends Configured implements Tool {
    public int run(String[] strings) throws Exception {
        return 0;
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        if (args.length != 2) {
            System.err.println("Usage: MaxTemperature <input path> <output path>");
            System.exit(-1);
        }

        Configuration conf = new Configuration();
        conf.set("mapred.jar", "flinqo-hadoop-1.0.jar");
        Job job = Job.getInstance(conf);
        job.setJarByClass(MaxTemperature.class);
        job.setJobName("Max temperature");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

mvn package打成jar包后上传,pom文件需要添加:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>flinqo.hadoop</groupId>
    <artifactId>flinqo-hadoop</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.4.4</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

不添加的话,jar运行的时候或报错,找不到主类。

运行程序

运行命令:

hadoop jar flinqo-hadoop-1.0.jar /user/hadoop/climate/temp.txt /user/hadoop/result

运行结束后查看结果:

hdfs dfs -cat /user/hadoop/result

结果:

1900	499
1901	495
1902	499
1903	498
1904	493
1905	499
1906	498
1907	494
1908	499
1909	497
1910	499
1911	497
1912	499
1913	493
1914	498
1915	497
1916	497
1917	499
1918	493
1919	498
1920	499
1921	499
1922	499
1923	497
1924	499
1925	498
1926	499
1927	499
1928	499
1929	498
1930	498
1931	499
1932	497
1933	495
1934	499
1935	498
1936	498
1937	498
1938	496
1939	497
1940	495
1941	499
1942	497
1943	499
1944	499
1945	499
1946	499
1947	499
1948	499
1949	497

因为放大了10倍,所以数值比较大。