博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的FsCheckpointStorage
阅读量:6962 次
发布时间:2019-06-27

本文共 11394 字,大约阅读时间需要 37 分钟。

  hot3.png

本文主要研究一下flink的FsCheckpointStorage

CheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorage.java

/** * CheckpointStorage implements the durable storage of checkpoint data and metadata streams. * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation}, * created by this class. */public interface CheckpointStorage {	boolean supportsHighlyAvailableStorage();	boolean hasDefaultSavepointLocation();	CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException;	CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException;	CheckpointStorageLocation initializeLocationForSavepoint(			long checkpointId,			@Nullable String externalLocationPointer) throws IOException;	CheckpointStreamFactory resolveCheckpointStorageLocation(			long checkpointId,			CheckpointStorageLocationReference reference) throws IOException;	CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException;}
  • CheckpointStorage接口主要定义了持久化checkpoint data及metadata streams的基本方法;supportsHighlyAvailableStorage方法返回该backend是否支持highly available storage;hasDefaultSavepointLocation方法是否有默认的savepoint location;resolveCheckpoint方法用于解析checkpoint location返回CompletedCheckpointStorageLocation;initializeLocationForCheckpoint方法根据checkpointId来初始化storage location;initializeLocationForSavepoint方法用于根据checkpointId来初始化savepoint的storage location;resolveCheckpointStorageLocation方法解析CheckpointStorageLocationReference返回CheckpointStreamFactory;createTaskOwnedStateStream方法用于打开一个stream来持久化checkpoint state

AbstractFsCheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java

/** * An implementation of durable checkpoint storage to file systems. */public abstract class AbstractFsCheckpointStorage implements CheckpointStorage {	// ------------------------------------------------------------------------	//  Constants	// ------------------------------------------------------------------------	/** The prefix of the directory containing the data exclusive to a checkpoint. */	public static final String CHECKPOINT_DIR_PREFIX = "chk-";	/** The name of the directory for shared checkpoint state. */	public static final String CHECKPOINT_SHARED_STATE_DIR = "shared";	/** The name of the directory for state not owned/released by the master, but by the TaskManagers. */	public static final String CHECKPOINT_TASK_OWNED_STATE_DIR = "taskowned";	/** The name of the metadata files in checkpoints / savepoints. */	public static final String METADATA_FILE_NAME = "_metadata";	/** The magic number that is put in front of any reference. */	private static final byte[] REFERENCE_MAGIC_NUMBER = new byte[] { 0x05, 0x5F, 0x3F, 0x18 };	// ------------------------------------------------------------------------	//  Fields and properties	// ------------------------------------------------------------------------	/** The jobId, written into the generated savepoint directories. */	private final JobID jobId;	/** The default location for savepoints. Null, if none is configured. */	@Nullable	private final Path defaultSavepointDirectory;	@Override	public boolean hasDefaultSavepointLocation() {		return defaultSavepointDirectory != null;	}	@Override	public CompletedCheckpointStorageLocation resolveCheckpoint(String checkpointPointer) throws IOException {		return resolveCheckpointPointer(checkpointPointer);	}	/**	 * Creates a file system based storage location for a savepoint.	 *	 * 

This methods implements the logic that decides which location to use (given optional * parameters for a configured location and a location passed for this specific savepoint) * and how to name and initialize the savepoint directory. * * @param externalLocationPointer The target location pointer for the savepoint. * Must be a valid URI. Null, if not supplied. * @param checkpointId The checkpoint ID of the savepoint. * * @return The checkpoint storage location for the savepoint. * * @throws IOException Thrown if the target directory could not be created. */ @Override public CheckpointStorageLocation initializeLocationForSavepoint( @SuppressWarnings("unused") long checkpointId, @Nullable String externalLocationPointer) throws IOException { // determine where to write the savepoint to final Path savepointBasePath; if (externalLocationPointer != null) { savepointBasePath = new Path(externalLocationPointer); } else if (defaultSavepointDirectory != null) { savepointBasePath = defaultSavepointDirectory; } else { throw new IllegalArgumentException("No savepoint location given and no default location configured."); } // generate the savepoint directory final FileSystem fs = savepointBasePath.getFileSystem(); final String prefix = "savepoint-" + jobId.toString().substring(0, 6) + '-'; Exception latestException = null; for (int attempt = 0; attempt < 10; attempt++) { final Path path = new Path(savepointBasePath, FileUtils.getRandomFilename(prefix)); try { if (fs.mkdirs(path)) { // we make the path qualified, to make it independent of default schemes and authorities final Path qp = path.makeQualified(fs); return createSavepointLocation(fs, qp); } } catch (Exception e) { latestException = e; } } throw new IOException("Failed to create savepoint directory at " + savepointBasePath, latestException); } protected abstract CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException; //......}

  • AbstractFsCheckpointStorage主要是实现了CheckpointStorage接口的hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint方法
  • resolveCheckpoint方法主要做两件事情,一个是解析checkpoint/savepoint path,一个是解析checkpoint/savepoint的metadata path,获取他们的FileStatus,然后创建FsCompletedCheckpointStorageLocation
  • initializeLocationForSavepoint方法主要是给savepoint创建一个CheckpointStorageLocation,它可以根据externalLocationPointer来创建,该值为null的话则使用defaultSavepointDirectory,该方法里头调用了createSavepointLocation抽象方法,由子类去实现

FsCheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java

/** * An implementation of durable checkpoint storage to file systems. */public class FsCheckpointStorage extends AbstractFsCheckpointStorage {	private final FileSystem fileSystem;	private final Path checkpointsDirectory;	private final Path sharedStateDirectory;	private final Path taskOwnedStateDirectory;	private final int fileSizeThreshold;	public FsCheckpointStorage(			Path checkpointBaseDirectory,			@Nullable Path defaultSavepointDirectory,			JobID jobId,			int fileSizeThreshold) throws IOException {		this(checkpointBaseDirectory.getFileSystem(),				checkpointBaseDirectory,				defaultSavepointDirectory,				jobId,				fileSizeThreshold);	}	public FsCheckpointStorage(			FileSystem fs,			Path checkpointBaseDirectory,			@Nullable Path defaultSavepointDirectory,			JobID jobId,			int fileSizeThreshold) throws IOException {		super(jobId, defaultSavepointDirectory);		checkArgument(fileSizeThreshold >= 0);		this.fileSystem = checkNotNull(fs);		this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointBaseDirectory, jobId);		this.sharedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_SHARED_STATE_DIR);		this.taskOwnedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_TASK_OWNED_STATE_DIR);		this.fileSizeThreshold = fileSizeThreshold;		// initialize the dedicated directories		fileSystem.mkdirs(checkpointsDirectory);		fileSystem.mkdirs(sharedStateDirectory);		fileSystem.mkdirs(taskOwnedStateDirectory);	}	// ------------------------------------------------------------------------	public Path getCheckpointsDirectory() {		return checkpointsDirectory;	}	// ------------------------------------------------------------------------	//  CheckpointStorage implementation	// ------------------------------------------------------------------------	@Override	public boolean supportsHighlyAvailableStorage() {		return true;	}	@Override	public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {		checkArgument(checkpointId >= 0);		// prepare all the paths needed for the checkpoints		final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);		// create the checkpoint exclusive directory		fileSystem.mkdirs(checkpointDir);		return new FsCheckpointStorageLocation(				fileSystem,				checkpointDir,				sharedStateDirectory,				taskOwnedStateDirectory,				CheckpointStorageLocationReference.getDefault(),				fileSizeThreshold);	}	@Override	public CheckpointStreamFactory resolveCheckpointStorageLocation(			long checkpointId,			CheckpointStorageLocationReference reference) throws IOException {		if (reference.isDefaultReference()) {			// default reference, construct the default location for that particular checkpoint			final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);			return new FsCheckpointStorageLocation(					fileSystem,					checkpointDir,					sharedStateDirectory,					taskOwnedStateDirectory,					reference,					fileSizeThreshold);		}		else {			// location encoded in the reference			final Path path = decodePathFromReference(reference);			return new FsCheckpointStorageLocation(					path.getFileSystem(),					path,					path,					path,					reference,					fileSizeThreshold);		}	}	@Override	public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {		return new FsCheckpointStateOutputStream(				taskOwnedStateDirectory,				fileSystem,				FsCheckpointStreamFactory.DEFAULT_WRITE_BUFFER_SIZE,				fileSizeThreshold);	}	@Override	protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException {		final CheckpointStorageLocationReference reference = encodePathAsReference(location);		return new FsCheckpointStorageLocation(fs, location, location, location, reference, fileSizeThreshold);	}}
  • FsCheckpointStorage继承了AbstractFsCheckpointStorage,实现了它定义的createSavepointLocation方法,这里返回的是FsCheckpointStorageLocation
  • FsCheckpointStorage还实现了CheckpointStorage接口定义的AbstractFsCheckpointStorage未实现的几个方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
  • supportsHighlyAvailableStorage这里直接返回true;initializeLocationForCheckpoint这里创建的是FsCheckpointStorageLocation;resolveCheckpointStorageLocation这里创建的是FsCheckpointStorageLocation;而createTaskOwnedStateStream创建的是FsCheckpointStateOutputStream

小结

  • CheckpointStorage接口主要定义了持久化checkpoint data及metadata streams的基本方法;AbstractFsCheckpointStorage主要是实现了CheckpointStorage接口的hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint方法,同时定义了一个抽象方法createSavepointLocation
  • FsCheckpointStorage继承了AbstractFsCheckpointStorage,实现了它定义的createSavepointLocation方法,同时还实现了CheckpointStorage接口定义的AbstractFsCheckpointStorage未实现的几个方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
  • FsCheckpointStorage的supportsHighlyAvailableStorage方法直接返回true;initializeLocationForCheckpoint方法创建的是FsCheckpointStorageLocation;resolveCheckpointStorageLocation方法创建的是FsCheckpointStorageLocation;而createTaskOwnedStateStream方法创建的是FsCheckpointStateOutputStream

doc

转载于:https://my.oschina.net/go4it/blog/2989073

你可能感兴趣的文章
Visual Paradigm 教程[UML]:如何使用刻板印象和标记值?(下)
查看>>
在Python中使用静态,类或抽象方法的权威指南
查看>>
[漏洞预警]交易所漏洞之"薅羊毛"分析
查看>>
SCRUM起源
查看>>
对Docker了解多少?10分钟带你从入门操作到实战上手
查看>>
小程序对汽车行业,会是一波红利吗?
查看>>
linux中sleep函数的使用和总结
查看>>
Zookeeper之介绍和安装环境准备
查看>>
分布式系统图片上传方案 (一)
查看>>
APP上线前的优化
查看>>
省市二级联动菜单改进
查看>>
学习redis(3)哨兵模式
查看>>
kafka学习(6)集群的搭建
查看>>
php扩展开发入门
查看>>
BOM中各对象属性和方法
查看>>
XMLHttpRequest 对象
查看>>
How to Collect and Display System Statistics (CPU and IO) for CBO use
查看>>
git使用
查看>>
Centos7安装eclipse
查看>>
struts2.3.1实现登陆
查看>>