当前位置:   article > 正文

HDFS Java API编程_hdfs java api编程 ——文件读写

hdfs java api编程 ——文件读写

第1关:文件读写

知识点

1.HDFS文件创建和操作步骤

step1:获取FileSystem对象;

step2:通过FSDataOutputStream进行写入;

step3:通过FSDataInputStream将文件内容输出。

编程要求

  • 获取hadoop的系统设置,并在其中创建HDFS文件,文件路径为/user/hadoop/myfile
  • myfile文件中添加字符串https://www.educoder.net
  • 读取刚刚创建myfile文件中的内容,并输出。
  1. import java.io.*;
  2. import java.sql.Date;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.FSDataInputStream;
  5. import org.apache.hadoop.fs.FSDataOutputStream;
  6. import org.apache.hadoop.fs.FileStatus;
  7. import org.apache.hadoop.fs.FileSystem;
  8. import org.apache.hadoop.fs.Path;
  9. public class hdfs {
  10. public static void main(String[] args) throws IOException {
  11. //请在 Begin-End 之间添加代码,完成任务要求。
  12. /********* Begin *********/
  13. //获取FileSystem对象
  14. Configuration conf=new Configuration(); //实现hadoop各模块之间值的传递
  15. FileSystem fs=FileSystem.get(conf); //获取文件系统
  16. Path file=new Path("/user/hadoop/myfile"); //创建文件
  17. //通过FSDataOutputStream进行写入
  18. FSDataOutputStream outStream=fs.create(file); //获取输出流
  19. outStream.writeUTF("https://www.educoder.net"); //可写入任意字符
  20. outStream.close(); //记得关闭输出流
  21. //通过FSDataInputStream将文件内容输出
  22. FSDataInputStream inStream=fs.open(file); //获取输入流
  23. String data=inStream.readUTF(); //读取文件
  24. /********* End *********/
  25. }
  26. }

第2关:文件上传

编程要求

完成向HDFS中上传文本文件,如果指定的文件在HDFS中已经存在,由用户指定是追加到原有文件末尾还是覆盖原有的文件。

  1. import java.io.*;
  2. import java.sql.Date;
  3. import java.util.Scanner;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.FSDataInputStream;
  6. import org.apache.hadoop.fs.FSDataOutputStream;
  7. import org.apache.hadoop.fs.FileStatus;
  8. import org.apache.hadoop.fs.FileSystem;
  9. import org.apache.hadoop.fs.Path;
  10. public class hdfs {
  11. /**
  12. * 判断路径是否存在
  13. */
  14. public static boolean test(Configuration conf, String path) throws IOException {
  15. /*****start*****/
  16. //请在此处编写判断文件是否存在的代码
  17. FileSystem fs = FileSystem.get(conf);//获取对象
  18. fs.exists(new Path(path)); //判断该路径的文件是否存在,是则返回true
  19. return fs.exists(new Path(path));
  20. /*****end*****/
  21. }
  22. /**
  23. * 复制文件到指定路径
  24. * 若路径已存在,则进行覆盖
  25. */
  26. public static void copyFromLocalFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException {
  27. /*****start*****/
  28. //请在此处编写复制文件到指定路径的代码
  29. FileSystem fs = FileSystem.get(conf);//获取对象
  30. Path localPath=new Path(localFilePath);
  31. Path remotePath=new Path(remoteFilePath);
  32. /* fs.copyFromLocalFile 第一个参数表示是否删除源文件,第二个参数表示是否覆盖 */
  33. fs.copyFromLocalFile(false,true,localPath,remotePath);
  34. /*****end*****/
  35. }
  36. /**
  37. * 追加文件内容
  38. */
  39. public static void appendToFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException {
  40. /*****start*****/
  41. //请在此处编写追加文件内容的代码
  42. FileSystem fs=FileSystem.get(conf);
  43. Path remotePath=new Path(remoteFilePath);
  44. //创建一个文件读入流
  45. FileInputStream in=new FileInputStream(localFilePath);
  46. //创建一个文件输出流。输出内容将追加到文件末尾
  47. FSDataOutputStream out=fs.append(remotePath);
  48. //读写文件内容
  49. byte[] data=new byte[1024];
  50. int read=-1;
  51. while((read=in.read(data))>0){
  52. out.write(data,0,read);
  53. }
  54. /*****end*****/
  55. }
  56. /**
  57. * 主函数
  58. */
  59. public static void main(String[] args)throws IOException {
  60. Configuration conf = new Configuration();
  61. createHDFSFile(conf);
  62. String localFilePath = "./file/text.txt"; // 本地路径
  63. String remoteFilePath = "/user/hadoop/text.txt"; // HDFS路径
  64. String choice = "";
  65. try {
  66. /* 判断文件是否存在 */
  67. Boolean fileExists = false;
  68. if (hdfs.test(conf, remoteFilePath)) {
  69. fileExists = true;
  70. System.out.println(remoteFilePath + " 已存在.");
  71. choice = "append"; //若文件存在则追加到文件末尾
  72. } else {
  73. System.out.println(remoteFilePath + " 不存在.");
  74. choice = "overwrite"; //覆盖
  75. }
  76. /*****start*****/
  77. //请在此处编写文件不存在则上传 文件choice等于overwrite则覆盖 choice 等于append 则追加的逻辑
  78. if (fileExists!=true ) { // 文件不存在,则上传
  79. System.out.println(localFilePath + " 已上传至 " + remoteFilePath);
  80. } else if (choice=="overwrite" ) { // 选择覆盖
  81. System.out.println(localFilePath + " 已覆盖 " + remoteFilePath);
  82. } else if ( choice=="append") { // 选择追加
  83. System.out.println(localFilePath + " 已追加至 " + remoteFilePath);
  84. }
  85. /*****end*****/
  86. } catch (Exception e) {
  87. e.printStackTrace();
  88. }
  89. }
  90. //创建HDFS文件
  91. public static void createHDFSFile(Configuration conf)throws IOException{
  92. FileSystem fs = FileSystem.get(conf); //获取文件系统
  93. Path file = new Path("/user/hadoop/text.txt"); //创建文件
  94. FSDataOutputStream outStream = fs.create(file); //获取输出流
  95. outStream.writeUTF("hello##Hdfs");
  96. outStream.close();
  97. fs.close();
  98. }
  99. }

第3关:文件下载

编程要求

完成从HDFS中下载文件的功能

  1. import java.io.*;
  2. import java.sql.Date;
  3. import java.util.Scanner;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.FSDataInputStream;
  6. import org.apache.hadoop.fs.FSDataOutputStream;
  7. import org.apache.hadoop.fs.FileStatus;
  8. import org.apache.hadoop.fs.FileSystem;
  9. import org.apache.hadoop.fs.Path;
  10. public class hdfs {
  11. /**
  12. * 下载文件到本地
  13. * 判断本地路径是否已存在,若已存在,则自动进行重命名
  14. */
  15. public static void copyToLocal(Configuration conf, String remoteFilePath, String localFilePath) throws IOException {
  16. FileSystem fs = FileSystem.get(conf);
  17. Path remotePath = new Path(remoteFilePath);
  18. File f = new File(localFilePath);
  19. /*****start*****/
  20. /*在此添加判断文件是否存在的代码,如果文件名存在,自动重命名(在文件名后面加上 _0, _1 ...) */
  21. if (fs.exists(new Path(localFilePath))) {
  22. System.out.println(localFilePath + " 已存在.");
  23. Integer i = 0;
  24. while ( f.exists() ) {
  25. f = new File( localFilePath );
  26. if ( f.exists() ) {
  27. localFilePath = localFilePath + "_" + i ;
  28. break;
  29. }
  30. }
  31. System.out.println("将重新命名为: " + localFilePath);
  32. }
  33. /*****end*****/
  34. /*****start*****/
  35. // 在此添加将文件下载到本地的代码
  36. Path localPath=new Path(localFilePath);
  37. fs.copyToLocalFile(remotePath,localPath);
  38. /*****end*****/
  39. fs.close();
  40. }
  41. /**
  42. * 主函数
  43. */
  44. public static void main(String[] args)throws IOException {
  45. Configuration conf = new Configuration();
  46. createHDFSFile(conf);
  47. String localFilePath = "/tmp/output/text.txt"; // 本地路径
  48. String remoteFilePath = "/user/hadoop/text.txt"; // HDFS路径
  49. try {
  50. //调用方法下载文件至本地
  51. hdfs.copyToLocal(conf, remoteFilePath, localFilePath);
  52. System.out.println("下载完成");
  53. } catch (Exception e) {
  54. e.printStackTrace();
  55. }
  56. }
  57. //创建HDFS文件
  58. public static void createHDFSFile(Configuration conf)throws IOException{
  59. FileSystem fs = FileSystem.get(conf); //获取文件系统
  60. Path file = new Path("/user/hadoop/text.txt"); //创建文件
  61. FSDataOutputStream outStream = fs.create(file); //获取输出流
  62. outStream.writeUTF("hello hadoop HDFS www.educoder.net");
  63. outStream.close();
  64. fs.close();
  65. }
  66. }

第4关:使用字符流读取数据

知识点

1.使用字符流读取数据分为三步:

step1:通过Configuration对象获取FileSystem对象;

step2:通过fs获取FSDataInputStream对象;

step3:通过字符流循环读取文件中数据并输出。

编程要求

完成将HDFS中指定文件输出到指定文件中

  1. import java.io.*;
  2. import java.sql.Date;
  3. import java.util.Scanner;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.FSDataInputStream;
  6. import org.apache.hadoop.fs.FSDataOutputStream;
  7. import org.apache.hadoop.fs.FileStatus;
  8. import org.apache.hadoop.fs.FileSystem;
  9. import org.apache.hadoop.fs.Path;
  10. public class hdfs {
  11. /**
  12. * 读取文件内容
  13. */
  14. public static void cat(Configuration conf, String remoteFilePath) throws IOException {
  15. /*****start*****/
  16. //1.读取文件中的数据
  17. FileSystem fs=FileSystem.get(conf);
  18. Path remotePath=new Path(remoteFilePath);
  19. FSDataInputStream in=fs.open(remotePath);
  20. BufferedReader d=new BufferedReader(new InputStreamReader(in));
  21. StringBuffer buffer=new StringBuffer();
  22. String line=null;
  23. while((line=d.readLine())!=null)
  24. {
  25. buffer.append(line);
  26. }
  27. String res = buffer.toString();
  28. //2.将读取到的数据输出到 /tmp/output/text.txt 文件中 提示:可以使用FileWriter
  29. FileWriter f1=new FileWriter("/tmp/output/text.txt");
  30. f1.write(res);
  31. f1.close();
  32. /*****end*****/
  33. }
  34. /**
  35. * 主函数
  36. */
  37. public static void main(String[] args)throws IOException {
  38. Configuration conf = new Configuration();
  39. createHDFSFile(conf);
  40. String remoteFilePath = "/user/hadoop/text.txt"; // HDFS路径
  41. try {
  42. System.out.println("读取文件: " + remoteFilePath);
  43. hdfs.cat(conf, remoteFilePath);
  44. System.out.println("\n读取完成");
  45. } catch (Exception e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. //创建HDFS文件
  50. public static void createHDFSFile(Configuration conf)throws IOException{
  51. FileSystem fs = FileSystem.get(conf); //获取文件系统
  52. Path file = new Path("/user/hadoop/text.txt"); //创建文件
  53. FSDataOutputStream outStream = fs.create(file); //获取输出流
  54. outStream.writeUTF("hello hadoop HDFS step4 www.educoder.net");
  55. outStream.close();
  56. fs.close();
  57. }
  58. }

第5关:删除文件

知识点

1.public boolean delete(Path f, Boolean recursive) 永久性删除指定的文件或目录,如果f是一个空目录或者文件,那么recursive的值就会被忽略。只有recursive=true时,一个非空目录及其内容才会被删除(即递归删除所有文件)。

编程要求

删除HDFS中/user/hadoop/text.txt文件

  1. import java.io.*;
  2. import java.sql.Date;
  3. import java.util.Scanner;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.FSDataInputStream;
  6. import org.apache.hadoop.fs.FSDataOutputStream;
  7. import org.apache.hadoop.fs.FileStatus;
  8. import org.apache.hadoop.fs.FileSystem;
  9. import org.apache.hadoop.fs.Path;
  10. public class hdfs {
  11. /**
  12. * 删除文件
  13. */
  14. public static boolean rm(Configuration conf, String remoteFilePath) throws IOException {
  15. /*****start*****/
  16. //请在此添加删除文件的代码
  17. FileSystem fs=FileSystem.get(conf);
  18. Path remotePath=new Path(remoteFilePath);
  19. boolean result=fs.delete(remotePath,false);
  20. return result ;
  21. /*****end*****/
  22. }
  23. /**
  24. * 主函数
  25. */
  26. public static void main(String[] args) {
  27. Configuration conf = new Configuration();
  28. String remoteFilePath = "/user/hadoop/text.txt"; // HDFS文件
  29. try {
  30. if (rm(conf, remoteFilePath) ) {
  31. System.out.println("文件删除: " + remoteFilePath);
  32. } else {
  33. System.out.println("操作失败(文件不存在或删除失败)");
  34. }
  35. } catch (Exception e) {
  36. e.printStackTrace();
  37. }
  38. }
  39. }

第6关:删除文件夹

知识点

1.验证目录下是否存在文件:public RemoteIterator<LocatedFileStatus> listFiles(Path f, Boolean recursive),recursivefalse,则返回目录中的文件;如果recursivetrue,则在根目录中返回文件。

2.删除HDFS中的文件和目录:public boolean delete(Path f, Boolean recursive)

编程要求

删除HDFS中/user/hadoop/tmp目录和/user/hadoop/dir目录,删除前,需要判断两个目录是否为空,若不为空则不删除,否则删除。

  1. import java.io.*;
  2. import java.sql.Date;
  3. import java.util.Scanner;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.*;
  6. public class hdfs {
  7. /**
  8. * 判断目录是否为空
  9. * true: 空,false: 非空
  10. */
  11. public static boolean isDirEmpty(Configuration conf, String remoteDir) throws IOException {
  12. /*****start*****/
  13. //请在此添加判断目录是否为空的代码
  14. FileSystem fs=FileSystem.get(conf);
  15. Path dirPath=new Path(remoteDir);
  16. RemoteIterator<LocatedFileStatus> result=fs.listFiles(dirPath,true);
  17. return !result.hasNext() ;
  18. /*****end*****/
  19. }
  20. /**
  21. * 删除目录
  22. */
  23. public static boolean rmDir(Configuration conf, String remoteDir, boolean recursive) throws IOException {
  24. /*****start*****/
  25. //请在此添加删除目录的代码
  26. FileSystem fs=FileSystem.get(conf);
  27. Path dirPath=new Path(remoteDir);
  28. boolean result=fs.delete(dirPath,false);
  29. return result ;
  30. /*****end*****/
  31. }
  32. /**
  33. * 主函数
  34. */
  35. public static void main(String[] args) {
  36. Configuration conf = new Configuration();
  37. String remoteDir = "/user/hadoop/dir/"; // HDFS目录
  38. String remoteDir1 = "/user/hadoop/tmp/"; // HDFS目录
  39. Boolean forceDelete = false; // 是否强制删除
  40. try {
  41. if ( !isDirEmpty(conf, remoteDir) && !forceDelete ) {
  42. System.out.println("目录不为空,不删除");
  43. } else {
  44. if ( rmDir(conf, remoteDir, forceDelete) ) {
  45. System.out.println("目录已删除: " + remoteDir);
  46. } else {
  47. System.out.println("操作失败");
  48. }
  49. }
  50. if ( !isDirEmpty(conf, remoteDir1) && !forceDelete ) {
  51. System.out.println("目录不为空,不删除");
  52. } else {
  53. if ( rmDir(conf, remoteDir1, forceDelete) ) {
  54. System.out.println("目录已删除: " + remoteDir1);
  55. } else {
  56. System.out.println("操作失败");
  57. }
  58. }
  59. } catch (Exception e) {
  60. e.printStackTrace();
  61. }
  62. }
  63. }

第7关:自定义数据输入流

知识点

1.相关方法

public int read(char[] cbuf,int off,int len)throws IOException

(1)已经读取了指定的字符数, 底层流的 read 方法返回 -1,指示文件末尾(end-of-file),或者底层流的 ready 方法返回 false,指示将阻塞后续的输入请求。

(2) 如果第一次对底层流调用 read 返回 -1(指示文件末尾),则此方法返回 -1,否则此方法返回实际读取的字符数。

编程要求

实现按行读取HDFS中指定文件的方法readLine(),如果读到文件末尾,则返回空,否则返回文件一行的文本,即实现和BufferedReader类的readLine()方法类似的效果。

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.FSDataInputStream;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import java.io.*;
  6. public class MyFSDataInputStream extends FSDataInputStream {
  7. public MyFSDataInputStream(InputStream in) {
  8. super(in);
  9. }
  10. /**
  11. * 实现按行读取
  12. * 每次读入一个字符,遇到"\n"结束,返回一行内容
  13. */
  14. public static String readline(BufferedReader br) throws IOException {
  15. /*****start*****/
  16. //请在此处填充代码实现相关功能
  17. char[] data = new char[1024];
  18. int read = -1;
  19. int off = 0; // 循环执行时,br 每次会从上一次读取结束的位置继续读取,因此该函数里,off 每次都从0开始
  20. while ( (read = br.read(data, off, 1)) != -1 ) {
  21. if (String.valueOf(data[off]).equals("\n") ) {
  22. off += 1;
  23. return String.valueOf(data, 0, read);
  24. }
  25. off += 1;
  26. return String.valueOf(data, 0, read);
  27. }
  28. return null ;
  29. /*****end*****/
  30. }
  31. /**
  32. * 读取文件内容
  33. */
  34. public static void cat(Configuration conf, String remoteFilePath) throws IOException {
  35. FileSystem fs = FileSystem.get(conf);
  36. Path remotePath = new Path(remoteFilePath);
  37. FSDataInputStream in = fs.open(remotePath);
  38. BufferedReader br = new BufferedReader(new InputStreamReader(in));
  39. FileWriter f = new FileWriter("/tmp/output/text.txt");
  40. String line = null;
  41. while ( (line = MyFSDataInputStream.readline(br)) != null ) {
  42. f.write(line);
  43. }
  44. f.close();
  45. br.close();
  46. in.close();
  47. fs.close();
  48. }
  49. /**
  50. * 主函数
  51. */
  52. public static void main(String[] args) {
  53. Configuration conf = new Configuration();
  54. String remoteFilePath = "/user/hadoop/text.txt"; // HDFS路径
  55. try {
  56. MyFSDataInputStream.cat(conf, remoteFilePath);
  57. } catch (Exception e) {
  58. e.printStackTrace();
  59. }
  60. }
  61. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/721864
推荐阅读
相关标签
  

闽ICP备14008679号