赞
踩
flink 中有两个重要的概念,Source 和 Sink ,Source 决定了我们的数据从哪里产生,而 Sink 决定了数据将要去到什么地方。
flink 自带有丰富的 Sink,比如:kafka、csv 文件、ES、Socket 等等。
当我们想要使用当前并未实现的 Sink 函数时,可以进行自定义。
这里主要自定义写入 kudu 的 kuduSink。
自定义sink需要我们实现 SinkFunction,或者继承 RichSinkFunction
package TestKudu; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.*; import org.apache.log4j.Logger; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.util.Map; public class SinkKudu extends RichSinkFunction<Map<String, Object>> { private final static Logger logger = Logger.getLogger(SinkKudu.class); private KuduClient client; private KuduTable table; private String kuduMaster; private String tableName; private Schema schema; private KuduSession kuduSession; private ByteArrayOutputStream out; private ObjectOutputStream os; public SinkKudu(String kuduMaster, String tableName) { this.kuduMaster = kuduMaster; this.tableName = tableName; } @Override public void open(Configuration parameters) throws Exception { out = new ByteArrayOutputStream(); os = new ObjectOutputStream(out); client = new KuduClient.KuduClientBuilder(kuduMaster).build(); table = client.openTable(tableName); schema = table.getSchema(); kuduSession = client.newSession(); kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND); } public void invoke(Map<String, Object> map) { if (map == null) { return; } try { int columnCount = schema.getColumnCount(); Insert insert = table.newInsert(); PartialRow row = insert.getRow(); for (int i = 0; i < columnCount; i++) { Object value = map.get(schema.getColumnByIndex(i).getName()); insertData(row, schema.getColumnByIndex(i).getType(), schema.getColumnByIndex(i).getName(), value); } OperationResponse response = kuduSession.apply(insert); if (response != null) { logger.error(response.getRowError().toString()); } } catch (Exception e) { logger.error(e); } } @Override public void close() throws Exception { try { kuduSession.close(); client.close(); os.close(); out.close(); } catch (Exception e) { logger.error(e); } } // 插入数据 private void insertData(PartialRow row, Type type, String columnName, Object value) throws IOException { try { switch (type) { case STRING: row.addString(columnName, value.toString()); return; case INT32: row.addInt(columnName, Integer.valueOf(value.toString())); return; case INT64: row.addLong(columnName, Long.valueOf(value.toString())); return; case DOUBLE: row.addDouble(columnName, Double.valueOf(value.toString())); return; case BOOL: row.addBoolean(columnName, (Boolean) value); return; // case INT8: // row.addByte(columnName, (byte) value); // return; // case INT16: // row.addShort(columnName, (short) value); // return; case BINARY: os.writeObject(value); row.addBinary(columnName, out.toByteArray()); return; case FLOAT: row.addFloat(columnName, Float.valueOf(String.valueOf(value))); return; default: throw new UnsupportedOperationException("Unknown type " + type); } } catch (Exception e) { logger.error("数据插入异常", e); } } }
KuduSink 函数, 继承了 RichSinkFunction,重写了 open、close 和 invoke 方法,在 open 中进行 kudu 相关配置的初始化,在 invoke 中进行数据写入的相关操作,最后在 close 中关掉所有的开关。
3、测试样例
package TestKudu; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.HashMap; import java.util.Map; public class SinkTest { public static void main(String []args) throws Exception { // 初始化 flink 执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 生成数据源 DataStreamSource<UserInfo> dataSource = env.fromElements(new UserInfo("001", "Jack", 18), new UserInfo("002", "Rose", 20), new UserInfo("003", "Cris", 22), new UserInfo("004", "Lily", 19), new UserInfo("005", "Lucy", 21), new UserInfo("006", "Json", 24)); // 转换数据 map SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource.map(new MapFunction<UserInfo, Map<String, Object>>() { public Map<String, Object> map(UserInfo value) throws Exception { Map<String, Object> map = new HashMap<String, Object>(); map.put("userid", value.userid); map.put("name", value.name); map.put("age", value.age); return map; } }); // sink 到 kudu String kuduMaster = ""; String tableInfo = ""; mapSource.addSink(new SinkKudu(kuduMaster, tableInfo)); env.execute("sink-test"); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。