J'ai pas mal joué avec le map reduce, maintenant je découvre la partie yarn application,
fonctionnalité d'hadoop 2 qui permet d'étendre les capacités de distribution du client sur les nodes du cluster
j'ai trouvé un exemple sympa que j'ai bricolé et adapter pour hadoop 2.6.0
On commence par le client, il va d'abord chercher à se connecter en mode client et demander
la creation d'une application à yarn puis on va récupérer le contexte qui va nous permettre
de définir les ressources de notre jar qu'on souhaite executer sur les nodes.
une fois qu'on aura fait tout ca on demandera à publier sur les nodes du cluster.
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3 ContainerLaunchContext localContainerLaunchContext = (ContainerLaunchContext)Records.newRecord(ContainerLaunchContext.class); localContainerLaunchContext.setCommands(Collections.singletonList("$JAVA_HOME/bin/java -Xmx256M yarnhello.ApplicationMaster 1><LOG_DIR>/stdout 2><LOG_DIR>/stderr"));
Client complet
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3 ApplicationId localApplicationId = localApplicationSubmissionContext.getApplicationId(); System.out.println("Client: Submitting " + localApplicationId); localYarnClient.submitApplication(localApplicationSubmissionContext);
ApplicationMaster, va être déployer sur les nodes selon la limite définie (containerCount = 3), puis il va lancer 3 fois le conteneur contenant ma classe Coucou
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 package yarnhello; import java.io.PrintStream; import java.util.Collections; import java.util.HashMap; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.Records; public class Client { private YarnConfiguration conf; public static void main(String[] paramArrayOfString) { System.out.println("Client: Initializing"); try { new Client().run(); } catch (Exception localException) { localException.printStackTrace(); } } private void run() throws Exception { this.conf = new YarnConfiguration(); YarnClient localYarnClient = YarnClient.createYarnClient(); localYarnClient.init(this.conf); localYarnClient.start(); YarnClientApplication localYarnClientApplication = localYarnClient.createApplication(); ContainerLaunchContext localContainerLaunchContext = (ContainerLaunchContext)Records.newRecord(ContainerLaunchContext.class); localContainerLaunchContext.setCommands(Collections.singletonList("$JAVA_HOME/bin/java -Xmx256M yarnhello.ApplicationMaster 1><LOG_DIR>/stdout 2><LOG_DIR>/stderr")); LocalResource localLocalResource = (LocalResource)Records.newRecord(LocalResource.class); Utils.setUpLocalResource(Utils.YARNAPP_JAR_PATH, localLocalResource, this.conf); localContainerLaunchContext.setLocalResources(Collections.singletonMap("YARNHELLO.jar", localLocalResource)); HashMap localHashMap = new HashMap(); Utils.setUpEnv(localHashMap, this.conf); localContainerLaunchContext.setEnvironment(localHashMap); Resource localResource = (Resource)Records.newRecord(Resource.class); localResource.setMemory(256); localResource.setVirtualCores(1); ApplicationSubmissionContext localApplicationSubmissionContext = localYarnClientApplication.getApplicationSubmissionContext(); localApplicationSubmissionContext.setApplicationName("YARNHELLO"); localApplicationSubmissionContext.setQueue("default"); localApplicationSubmissionContext.setAMContainerSpec(localContainerLaunchContext); localApplicationSubmissionContext.setResource(localResource); ApplicationId localApplicationId = localApplicationSubmissionContext.getApplicationId(); System.out.println("Client: Submitting " + localApplicationId); localYarnClient.submitApplication(localApplicationSubmissionContext); ApplicationReport localApplicationReport = localYarnClient.getApplicationReport(localApplicationId); YarnApplicationState localYarnApplicationState = localApplicationReport.getYarnApplicationState(); while ((localYarnApplicationState != YarnApplicationState.FINISHED) && (localYarnApplicationState != YarnApplicationState.KILLED) && (localYarnApplicationState != YarnApplicationState.FAILED)) { Thread.sleep(1000L); localApplicationReport = localYarnClient.getApplicationReport(localApplicationId); localYarnApplicationState = localApplicationReport.getYarnApplicationState(); } System.out.println("Client: Finished " + localApplicationId + " with state " + localYarnApplicationState); } }
l'application master va se deployer sur les noeuds et se prepare à lancer mes conteneurs avec mon jar java
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5 ContainerLaunchContext localContainerLaunchContext = (ContainerLaunchContext)Records.newRecord(ContainerLaunchContext.class); localContainerLaunchContext.setCommands(Collections.singletonList("$JAVA_HOME/bin/java -Xmx256M yarnhello.Coucou 1><LOG_DIR>/stdout 2><LOG_DIR>/stderr")); this.nmClient.startContainer(localContainer,localContainerLaunchContext )
il va se connecter au RessourceManager et au NodeManager
Ma classe bidon Coucou qui peut contenir tout ce que je veux
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136 package yarnhello; import java.io.PrintStream; import java.util.Collections; import java.util.HashMap; import java.util.List; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync.CallbackHandler; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.Records; public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { private YarnConfiguration conf = new YarnConfiguration(); private NMClient nmClient; private int containerCount = 3; public static void main(String[] paramArrayOfString) { System.out.println("AppMaster: Initializing"); try { new ApplicationMaster().run(); } catch (Exception localException) { localException.printStackTrace(); } } public void run() throws Exception { this.conf = new YarnConfiguration(); this.nmClient = NMClient.createNMClient(); this.nmClient.init(this.conf); this.nmClient.start(); AMRMClientAsync localAMRMClientAsync = AMRMClientAsync.createAMRMClientAsync(1000, this); localAMRMClientAsync.init(this.conf); localAMRMClientAsync.start(); localAMRMClientAsync.registerApplicationMaster("", 0, ""); System.out.println("AppMaster: Registered"); Priority localPriority = (Priority)Records.newRecord(Priority.class); localPriority.setPriority(0); Resource localResource = (Resource)Records.newRecord(Resource.class); localResource.setMemory(128); localResource.setVirtualCores(1); System.out.println("AppMaster: Requesting " + this.containerCount + " Containers"); for (int i = 0; i < this.containerCount; i++) { localAMRMClientAsync.addContainerRequest(new AMRMClient.ContainerRequest(localResource, null, null, localPriority)); } while (!containersFinished()) { Thread.sleep(100L); } System.out.println("AppMaster: Unregistered"); localAMRMClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", ""); } private boolean containersFinished() { return this.containerCount == 0; } public void onContainersAllocated(List<Container> paramList) { for (Container localContainer : paramList) try { this.nmClient.startContainer(localContainer, initContainer()); System.err.println("AppMaster: Container launched " + localContainer.getId()); } catch (Exception localException) { System.err.println("AppMaster: Container not launched " + localContainer.getId()); localException.printStackTrace(); } } private ContainerLaunchContext initContainer() { try { ContainerLaunchContext localContainerLaunchContext = (ContainerLaunchContext)Records.newRecord(ContainerLaunchContext.class); localContainerLaunchContext.setCommands(Collections.singletonList("$JAVA_HOME/bin/java -Xmx256M yarnhello.Coucou 1><LOG_DIR>/stdout 2><LOG_DIR>/stderr")); LocalResource localLocalResource = (LocalResource)Records.newRecord(LocalResource.class); Utils.setUpLocalResource(Utils.YARNAPP_JAR_PATH, localLocalResource, this.conf); localContainerLaunchContext.setLocalResources(Collections.singletonMap("YARNHELLO.jar", localLocalResource)); HashMap localHashMap = new HashMap(); Utils.setUpEnv(localHashMap, this.conf); localContainerLaunchContext.setEnvironment(localHashMap); return localContainerLaunchContext; } catch (Exception localException) { localException.printStackTrace(); }return null; } public void onContainersCompleted(List<ContainerStatus> paramList) { for (ContainerStatus localContainerStatus : paramList) { System.err.println("AppMaster: Container finished " + localContainerStatus.getContainerId()); synchronized (this) { this.containerCount -= 1; } } } public void onError(Throwable paramThrowable) { } public void onNodesUpdated(List<NodeReport> paramList) { } public void onShutdownRequest() { } public float getProgress() { return 0.0F; } }
edit: Classe util, ca peut servir
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package yarnhello; import java.io.PrintStream; public class Coucou { public static void main(String[] paramArrayOfString) throws Exception { System.out.println("Initialisation du conteneur"); System.out.println("Salut tout le monde ! bienvenu dans la boite à coucou"); System.out.println("Fin d'execution du conteneur"); } }
Donc il lance une premiere fois mon conteneur sur mon node itx01, puis deux autres mon autre
Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 package yarnhello; import java.io.IOException; import java.util.Map; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.ConverterUtils; public class Utils { public static final String YARNAPP_JAR_NAME = "YARNHELLO.jar"; public static final Path YARNAPP_JAR_PATH = new Path("/apps/YARNHELLO.jar"); public static void setUpEnv(Map<String, String> paramMap, YarnConfiguration paramYarnConfiguration) { StringBuilder localStringBuilder = new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$()).append("<CPS>").append("./*"); for (String str : paramYarnConfiguration.getStrings("yarn.application.classpath", YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) { localStringBuilder.append("<CPS>"); localStringBuilder.append(str.trim()); } localStringBuilder.append("<CPS>").append("./log4j.properties"); if (paramYarnConfiguration.getBoolean("yarn.is.minicluster", false)) { localStringBuilder.append(':'); localStringBuilder.append(System.getProperty("java.class.path")); } paramMap.put("CLASSPATH", localStringBuilder.toString()); } public static void setUpLocalResource(Path paramPath, LocalResource paramLocalResource, YarnConfiguration paramYarnConfiguration) throws IOException { Path localPath = FileContext.getFileContext().makeQualified(paramPath); FileStatus localFileStatus = FileSystem.get(paramYarnConfiguration).getFileStatus(localPath); paramLocalResource.setResource(ConverterUtils.getYarnUrlFromPath(localPath)); paramLocalResource.setSize(localFileStatus.getLen()); paramLocalResource.setTimestamp(localFileStatus.getModificationTime()); paramLocalResource.setType(LocalResourceType.FILE); paramLocalResource.setVisibility(LocalResourceVisibility.PUBLIC); } }
node itx02
Je suis en train de décortiquer l'exemple d'executeur de commande shell distribué d'hortonworks, il est un peu plus balaise.15/09/08 20:36:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/08 20:36:42 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /apps/YARNHELLO.jar
hduser@stargate:~$ hadoop fs -copyFromLocal YARNHELLO.jar /apps
15/09/08 20:36:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
hduser@stargate:~$ hadoop jar YARNHELLO.jar yarnhello.Client
Client: Initializing
15/09/08 20:37:13 INFO client.RMProxy: Connecting to ResourceManager at stargate/192.168.0.11:8032
15/09/08 20:37:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Client: Submitting application_1441649620937_0009
15/09/08 20:37:14 INFO impl.YarnClientImpl: Submitted application application_1441649620937_0009
Client: Finished application_1441649620937_0009 with state FINISHED
hduser@stargate:~$ yarn logs -applicationId application_1441649620937_0009
15/09/08 20:37:59 INFO client.RMProxy: Connecting to ResourceManager at stargate/192.168.0.11:8032
15/09/08 20:37:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Container: container_1441649620937_0009_01_000003 on itx01_38328
==================================================================
LogType:stderr
Log Upload Time:8-sept.-2015 20:37:28
LogLength:0
Log Contents:
LogType:stdout
Log Upload Time:8-sept.-2015 20:37:28
LogLength:112
Log Contents:
Initialisation du conteneur
Salut tout le monde ! bienvenu dans la boite Ã* coucou
Fin d'execution du conteneur
Container: container_1441649620937_0009_01_000004 on itx02_47715
==================================================================
LogType:stderr
Log Upload Time:8-sept.-2015 20:37:27
LogLength:0
Log Contents:
LogType:stdout
Log Upload Time:8-sept.-2015 20:37:27
LogLength:112
Log Contents:
Initialisation du conteneur
Salut tout le monde ! bienvenu dans la boite Ã* coucou
Fin d'execution du conteneur
Container: container_1441649620937_0009_01_000001 on itx02_47715
==================================================================
LogType:stderr
Log Upload Time:8-sept.-2015 20:37:27
LogLength:1391
Log Contents:
15/09/08 20:37:31 INFO impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
15/09/08 20:37:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/08 20:37:32 INFO client.RMProxy: Connecting to ResourceManager at stargate/192.168.0.11:8030
15/09/08 20:37:36 INFO impl.AMRMClientImpl: Received new token for : stargate:42554
15/09/08 20:37:36 INFO impl.AMRMClientImpl: Received new token for : itx01:38328
15/09/08 20:37:36 INFO impl.AMRMClientImpl: Received new token for : itx02:47715
15/09/08 20:37:38 INFO impl.ContainerManagementProtocolProxy: Opening proxy : stargate:42554
AppMaster: Container launched container_1441649620937_0009_01_000002
15/09/08 20:37:38 INFO impl.ContainerManagementProtocolProxy: Opening proxy : itx01:38328
AppMaster: Container launched container_1441649620937_0009_01_000003
15/09/08 20:37:39 INFO impl.ContainerManagementProtocolProxy: Opening proxy : itx02:47715
AppMaster: Container launched container_1441649620937_0009_01_000004
AppMaster: Container finished container_1441649620937_0009_01_000002
AppMaster: Container finished container_1441649620937_0009_01_000003
AppMaster: Container finished container_1441649620937_0009_01_000004
15/09/08 20:37:40 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
LogType:stdout
Log Upload Time:8-sept.-2015 20:37:27
LogLength:105
Log Contents:
AppMaster: Initializing
AppMaster: Registered
AppMaster: Requesting 3 Containers
AppMaster: Unregistered
Container: container_1441649620937_0009_01_000002 on stargate_42554
=====================================================================
LogType:stderr
Log Upload Time:8-sept.-2015 20:37:28
LogLength:0
Log Contents:
LogType:stdout
Log Upload Time:8-sept.-2015 20:37:28
LogLength:112
Log Contents:
Initialisation du conteneur
Salut tout le monde ! bienvenu dans la boite Ã* coucou
Fin d'execution du conteneur
Partager