赞
踩
1.HDFS文件创建和操作步骤
step1:获取FileSystem对象;
step2:通过FSDataOutputStream进行写入;
step3:通过FSDataInputStream将文件内容输出。
/user/hadoop/myfile
;myfile
文件中添加字符串https://www.educoder.net
;myfile
文件中的内容,并输出。- import java.io.*;
- import java.sql.Date;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
-
-
- public class hdfs {
-
- public static void main(String[] args) throws IOException {
- //请在 Begin-End 之间添加代码,完成任务要求。
- /********* Begin *********/
- //获取FileSystem对象
- Configuration conf=new Configuration(); //实现hadoop各模块之间值的传递
- FileSystem fs=FileSystem.get(conf); //获取文件系统
- Path file=new Path("/user/hadoop/myfile"); //创建文件
- //通过FSDataOutputStream进行写入
- FSDataOutputStream outStream=fs.create(file); //获取输出流
- outStream.writeUTF("https://www.educoder.net"); //可写入任意字符
- outStream.close(); //记得关闭输出流
- //通过FSDataInputStream将文件内容输出
- FSDataInputStream inStream=fs.open(file); //获取输入流
- String data=inStream.readUTF(); //读取文件
-
- /********* End *********/
-
- }
- }
-

完成向HDFS中上传文本文件,如果指定的文件在HDFS中已经存在,由用户指定是追加到原有文件末尾还是覆盖原有的文件。
- import java.io.*;
- import java.sql.Date;
- import java.util.Scanner;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
-
-
- public class hdfs {
-
- /**
- * 判断路径是否存在
- */
- public static boolean test(Configuration conf, String path) throws IOException {
- /*****start*****/
- //请在此处编写判断文件是否存在的代码
- FileSystem fs = FileSystem.get(conf);//获取对象
- fs.exists(new Path(path)); //判断该路径的文件是否存在,是则返回true
- return fs.exists(new Path(path));
-
- /*****end*****/
- }
-
- /**
- * 复制文件到指定路径
- * 若路径已存在,则进行覆盖
- */
- public static void copyFromLocalFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException {
- /*****start*****/
- //请在此处编写复制文件到指定路径的代码
- FileSystem fs = FileSystem.get(conf);//获取对象
- Path localPath=new Path(localFilePath);
- Path remotePath=new Path(remoteFilePath);
- /* fs.copyFromLocalFile 第一个参数表示是否删除源文件,第二个参数表示是否覆盖 */
- fs.copyFromLocalFile(false,true,localPath,remotePath);
-
- /*****end*****/
- }
-
- /**
- * 追加文件内容
- */
- public static void appendToFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException {
- /*****start*****/
- //请在此处编写追加文件内容的代码
- FileSystem fs=FileSystem.get(conf);
- Path remotePath=new Path(remoteFilePath);
- //创建一个文件读入流
- FileInputStream in=new FileInputStream(localFilePath);
- //创建一个文件输出流。输出内容将追加到文件末尾
- FSDataOutputStream out=fs.append(remotePath);
- //读写文件内容
- byte[] data=new byte[1024];
- int read=-1;
- while((read=in.read(data))>0){
- out.write(data,0,read);
- }
-
- /*****end*****/
- }
-
- /**
- * 主函数
- */
- public static void main(String[] args)throws IOException {
- Configuration conf = new Configuration();
-
- createHDFSFile(conf);
-
- String localFilePath = "./file/text.txt"; // 本地路径
- String remoteFilePath = "/user/hadoop/text.txt"; // HDFS路径
- String choice = "";
-
- try {
- /* 判断文件是否存在 */
- Boolean fileExists = false;
- if (hdfs.test(conf, remoteFilePath)) {
- fileExists = true;
- System.out.println(remoteFilePath + " 已存在.");
- choice = "append"; //若文件存在则追加到文件末尾
- } else {
- System.out.println(remoteFilePath + " 不存在.");
- choice = "overwrite"; //覆盖
- }
-
- /*****start*****/
- //请在此处编写文件不存在则上传 文件choice等于overwrite则覆盖 choice 等于append 则追加的逻辑
-
- if (fileExists!=true ) { // 文件不存在,则上传
-
- System.out.println(localFilePath + " 已上传至 " + remoteFilePath);
- } else if (choice=="overwrite" ) { // 选择覆盖
-
- System.out.println(localFilePath + " 已覆盖 " + remoteFilePath);
- } else if ( choice=="append") { // 选择追加
-
- System.out.println(localFilePath + " 已追加至 " + remoteFilePath);
- }
-
-
- /*****end*****/
-
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- //创建HDFS文件
- public static void createHDFSFile(Configuration conf)throws IOException{
- FileSystem fs = FileSystem.get(conf); //获取文件系统
- Path file = new Path("/user/hadoop/text.txt"); //创建文件
- FSDataOutputStream outStream = fs.create(file); //获取输出流
- outStream.writeUTF("hello##Hdfs");
- outStream.close();
- fs.close();
- }
-
-
- }

完成从HDFS中下载文件的功能
- import java.io.*;
- import java.sql.Date;
- import java.util.Scanner;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
-
-
- public class hdfs {
- /**
- * 下载文件到本地
- * 判断本地路径是否已存在,若已存在,则自动进行重命名
- */
- public static void copyToLocal(Configuration conf, String remoteFilePath, String localFilePath) throws IOException {
- FileSystem fs = FileSystem.get(conf);
- Path remotePath = new Path(remoteFilePath);
- File f = new File(localFilePath);
- /*****start*****/
- /*在此添加判断文件是否存在的代码,如果文件名存在,自动重命名(在文件名后面加上 _0, _1 ...) */
- if (fs.exists(new Path(localFilePath))) {
- System.out.println(localFilePath + " 已存在.");
- Integer i = 0;
- while ( f.exists() ) {
- f = new File( localFilePath );
- if ( f.exists() ) {
- localFilePath = localFilePath + "_" + i ;
- break;
- }
- }
- System.out.println("将重新命名为: " + localFilePath);
- }
-
- /*****end*****/
-
- /*****start*****/
- // 在此添加将文件下载到本地的代码
- Path localPath=new Path(localFilePath);
- fs.copyToLocalFile(remotePath,localPath);
-
- /*****end*****/
- fs.close();
- }
-
- /**
- * 主函数
- */
- public static void main(String[] args)throws IOException {
- Configuration conf = new Configuration();
- createHDFSFile(conf);
- String localFilePath = "/tmp/output/text.txt"; // 本地路径
- String remoteFilePath = "/user/hadoop/text.txt"; // HDFS路径
-
- try {
- //调用方法下载文件至本地
- hdfs.copyToLocal(conf, remoteFilePath, localFilePath);
- System.out.println("下载完成");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
-
- //创建HDFS文件
- public static void createHDFSFile(Configuration conf)throws IOException{
- FileSystem fs = FileSystem.get(conf); //获取文件系统
- Path file = new Path("/user/hadoop/text.txt"); //创建文件
- FSDataOutputStream outStream = fs.create(file); //获取输出流
- outStream.writeUTF("hello hadoop HDFS www.educoder.net");
- outStream.close();
- fs.close();
- }
-
- }

1.使用字符流读取数据分为三步:
step1:通过Configuration
对象获取FileSystem
对象;
step2:通过fs
获取FSDataInputStream
对象;
step3:通过字符流循环读取文件中数据并输出。
完成将HDFS中指定文件输出到指定文件中
- import java.io.*;
- import java.sql.Date;
- import java.util.Scanner;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
-
-
- public class hdfs {
- /**
- * 读取文件内容
- */
- public static void cat(Configuration conf, String remoteFilePath) throws IOException {
-
-
- /*****start*****/
- //1.读取文件中的数据
- FileSystem fs=FileSystem.get(conf);
- Path remotePath=new Path(remoteFilePath);
- FSDataInputStream in=fs.open(remotePath);
- BufferedReader d=new BufferedReader(new InputStreamReader(in));
- StringBuffer buffer=new StringBuffer();
- String line=null;
- while((line=d.readLine())!=null)
- {
- buffer.append(line);
- }
- String res = buffer.toString();
-
-
- //2.将读取到的数据输出到 /tmp/output/text.txt 文件中 提示:可以使用FileWriter
- FileWriter f1=new FileWriter("/tmp/output/text.txt");
- f1.write(res);
- f1.close();
-
-
- /*****end*****/
- }
-
- /**
- * 主函数
- */
- public static void main(String[] args)throws IOException {
- Configuration conf = new Configuration();
- createHDFSFile(conf);
- String remoteFilePath = "/user/hadoop/text.txt"; // HDFS路径
-
- try {
- System.out.println("读取文件: " + remoteFilePath);
- hdfs.cat(conf, remoteFilePath);
- System.out.println("\n读取完成");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- //创建HDFS文件
- public static void createHDFSFile(Configuration conf)throws IOException{
- FileSystem fs = FileSystem.get(conf); //获取文件系统
- Path file = new Path("/user/hadoop/text.txt"); //创建文件
- FSDataOutputStream outStream = fs.create(file); //获取输出流
- outStream.writeUTF("hello hadoop HDFS step4 www.educoder.net");
- outStream.close();
- fs.close();
- }
-
-
- }

1.public boolean delete(Path f, Boolean recursive)
永久性删除指定的文件或目录,如果f
是一个空目录或者文件,那么recursive
的值就会被忽略。只有recursive=true
时,一个非空目录及其内容才会被删除(即递归删除所有文件)。
删除HDFS中/user/hadoop/text.txt
文件
- import java.io.*;
- import java.sql.Date;
- import java.util.Scanner;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
-
-
- public class hdfs {
-
- /**
- * 删除文件
- */
- public static boolean rm(Configuration conf, String remoteFilePath) throws IOException {
- /*****start*****/
- //请在此添加删除文件的代码
- FileSystem fs=FileSystem.get(conf);
- Path remotePath=new Path(remoteFilePath);
- boolean result=fs.delete(remotePath,false);
-
- return result ;
-
-
- /*****end*****/
- }
-
- /**
- * 主函数
- */
- public static void main(String[] args) {
- Configuration conf = new Configuration();
- String remoteFilePath = "/user/hadoop/text.txt"; // HDFS文件
-
- try {
- if (rm(conf, remoteFilePath) ) {
- System.out.println("文件删除: " + remoteFilePath);
- } else {
- System.out.println("操作失败(文件不存在或删除失败)");
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
-
- }

1.验证目录下是否存在文件:public RemoteIterator<LocatedFileStatus> listFiles(Path f, Boolean recursive),recursive
是false
,则返回目录中的文件;如果recursive
是true
,则在根目录中返回文件。
2.删除HDFS中的文件和目录:public boolean delete(Path f, Boolean recursive)
删除HDFS中/user/hadoop/tmp
目录和/user/hadoop/dir
目录,删除前,需要判断两个目录是否为空,若不为空则不删除,否则删除。
- import java.io.*;
- import java.sql.Date;
- import java.util.Scanner;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.*;
-
-
- public class hdfs {
-
-
- /**
- * 判断目录是否为空
- * true: 空,false: 非空
- */
- public static boolean isDirEmpty(Configuration conf, String remoteDir) throws IOException {
- /*****start*****/
- //请在此添加判断目录是否为空的代码
- FileSystem fs=FileSystem.get(conf);
- Path dirPath=new Path(remoteDir);
- RemoteIterator<LocatedFileStatus> result=fs.listFiles(dirPath,true);
-
- return !result.hasNext() ;
- /*****end*****/
- }
-
- /**
- * 删除目录
- */
- public static boolean rmDir(Configuration conf, String remoteDir, boolean recursive) throws IOException {
- /*****start*****/
- //请在此添加删除目录的代码
- FileSystem fs=FileSystem.get(conf);
- Path dirPath=new Path(remoteDir);
- boolean result=fs.delete(dirPath,false);
- return result ;
- /*****end*****/
-
- }
-
- /**
- * 主函数
- */
- public static void main(String[] args) {
- Configuration conf = new Configuration();
- String remoteDir = "/user/hadoop/dir/"; // HDFS目录
- String remoteDir1 = "/user/hadoop/tmp/"; // HDFS目录
- Boolean forceDelete = false; // 是否强制删除
-
- try {
- if ( !isDirEmpty(conf, remoteDir) && !forceDelete ) {
- System.out.println("目录不为空,不删除");
- } else {
- if ( rmDir(conf, remoteDir, forceDelete) ) {
- System.out.println("目录已删除: " + remoteDir);
- } else {
- System.out.println("操作失败");
- }
- }
-
- if ( !isDirEmpty(conf, remoteDir1) && !forceDelete ) {
- System.out.println("目录不为空,不删除");
- } else {
- if ( rmDir(conf, remoteDir1, forceDelete) ) {
- System.out.println("目录已删除: " + remoteDir1);
- } else {
- System.out.println("操作失败");
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
-
- }

1.相关方法
public int read(char[] cbuf,int off,int len)throws IOException
(1)已经读取了指定的字符数, 底层流的 read
方法返回 -1
,指示文件末尾(end-of-file
),或者底层流的 ready
方法返回 false
,指示将阻塞后续的输入请求。
(2) 如果第一次对底层流调用 read
返回 -1
(指示文件末尾),则此方法返回 -1
,否则此方法返回实际读取的字符数。
实现按行读取HDFS中指定文件的方法readLine()
,如果读到文件末尾,则返回空,否则返回文件一行的文本,即实现和BufferedReader
类的readLine()
方法类似的效果。
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import java.io.*;
-
- public class MyFSDataInputStream extends FSDataInputStream {
- public MyFSDataInputStream(InputStream in) {
- super(in);
- }
-
- /**
- * 实现按行读取
- * 每次读入一个字符,遇到"\n"结束,返回一行内容
- */
- public static String readline(BufferedReader br) throws IOException {
- /*****start*****/
- //请在此处填充代码实现相关功能
- char[] data = new char[1024];
- int read = -1;
- int off = 0; // 循环执行时,br 每次会从上一次读取结束的位置继续读取,因此该函数里,off 每次都从0开始
- while ( (read = br.read(data, off, 1)) != -1 ) {
- if (String.valueOf(data[off]).equals("\n") ) {
- off += 1;
- return String.valueOf(data, 0, read);
- }
- off += 1;
- return String.valueOf(data, 0, read);
- }
-
- return null ;
- /*****end*****/
-
- }
-
- /**
- * 读取文件内容
- */
- public static void cat(Configuration conf, String remoteFilePath) throws IOException {
- FileSystem fs = FileSystem.get(conf);
- Path remotePath = new Path(remoteFilePath);
- FSDataInputStream in = fs.open(remotePath);
- BufferedReader br = new BufferedReader(new InputStreamReader(in));
- FileWriter f = new FileWriter("/tmp/output/text.txt");
- String line = null;
- while ( (line = MyFSDataInputStream.readline(br)) != null ) {
- f.write(line);
- }
- f.close();
- br.close();
- in.close();
- fs.close();
- }
-
- /**
- * 主函数
- */
- public static void main(String[] args) {
- Configuration conf = new Configuration();
- String remoteFilePath = "/user/hadoop/text.txt"; // HDFS路径
- try {
- MyFSDataInputStream.cat(conf, remoteFilePath);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。